You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:29 UTC

[41/50] [abbrv] git commit: Working delivery tests for UDP and Netty

Working delivery tests for UDP and Netty


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/0d111970
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/0d111970
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/0d111970

Branch: refs/heads/piper
Commit: 0d111970536ba50084f5c30899535ac8a6e27ffd
Parents: 92f3252
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Sat Oct 22 23:50:07 2011 -0400
Committer: Karthik Kambatla <kk...@cs.purdue.edu>
Committed: Sat Oct 22 23:51:25 2011 -0400

----------------------------------------------------------------------
 .../test/java/org/apache/s4/comm/NettyTest.java    |   88 +++++++++++++
 .../java/org/apache/s4/comm/NettyTestModule.java   |   96 ---------------
 .../org/apache/s4/comm/SimpleDeliveryTest.java     |   67 +++--------
 .../src/test/java/org/apache/s4/comm/UDPTest.java  |   89 +++++++++++++
 .../java/org/apache/s4/comm/UDPTestModule.java     |   96 ---------------
 5 files changed, 192 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTest.java
new file mode 100644
index 0000000..4db680d
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTest.java
@@ -0,0 +1,88 @@
+package org.apache.s4.comm;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.netty.NettyEmitter;
+import org.apache.s4.comm.netty.NettyListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+public class NettyTest extends SimpleDeliveryTest {
+
+	@Override
+	protected void setUp() {
+		Injector injector = Guice.createInjector(new NettyTestModule());
+		sdt = injector.getInstance(CommWrapper.class);
+	}
+
+	class NettyTestModule extends AbstractModule {
+
+		protected PropertiesConfiguration config = null;
+
+		private void loadProperties(Binder binder) {
+
+			try {
+				InputStream is = this.getClass().getResourceAsStream(
+						"/s4-comm-test.properties");
+				config = new PropertiesConfiguration();
+				config.load(is);
+
+				System.out.println(ConfigurationUtils.toString(config));
+				Names.bindProperties(binder,
+						ConfigurationConverter.getProperties(config));
+			} catch (ConfigurationException e) {
+				binder.addError(e);
+				e.printStackTrace();
+			}
+		}
+
+		@Override
+		protected void configure() {
+			if (config == null)
+				loadProperties(binder());
+
+			int numHosts = config.getList("cluster.hosts").size();
+			boolean isCluster = numHosts > 1 ? true : false;
+			bind(Boolean.class).annotatedWith(Names.named("isCluster"))
+					.toInstance(Boolean.valueOf(isCluster));
+
+			bind(Cluster.class);
+
+			bind(Assignment.class).to(AssignmentFromFile.class);
+
+			bind(Topology.class).to(TopologyFromFile.class);
+
+			/* Use a simple UDP comm layer implementation. */
+			bind(Listener.class).to(NettyListener.class);
+			bind(Emitter.class).to(NettyEmitter.class);
+
+			/* The hashing function to map keys top partitions. */
+			bind(Hasher.class).to(DefaultHasher.class);
+
+			/* Use Kryo to serialize events. */
+			bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+			bind(Integer.class).annotatedWith(
+					Names.named("emitter.send.interval")).toInstance(
+					config.getInt("emitter.send.interval"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java
deleted file mode 100644
index 65c3dee..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.netty.NettyEmitter;
-import org.apache.s4.comm.netty.NettyListener;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/*
- * Module for s4-comm/tests
- */
-public class NettyTestModule extends AbstractModule {
-
-	protected PropertiesConfiguration config = null;
-
-	private void loadProperties(Binder binder) {
-
-		try {
-			InputStream is = this.getClass().getResourceAsStream(
-					"/s4-comm-test.properties");
-			config = new PropertiesConfiguration();
-			config.load(is);
-
-			System.out.println(ConfigurationUtils.toString(config));
-			Names.bindProperties(binder,
-					ConfigurationConverter.getProperties(config));
-		} catch (ConfigurationException e) {
-			binder.addError(e);
-			e.printStackTrace();
-		}
-	}
-
-	@Override
-	protected void configure() {
-		if (config == null)
-			loadProperties(binder());
-
-		int numHosts = config.getList("cluster.hosts").size();
-		boolean isCluster = numHosts > 1 ? true : false;
-		bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(
-				Boolean.valueOf(isCluster));
-
-		bind(Cluster.class);
-
-		bind(Assignment.class).to(AssignmentFromFile.class);
-
-		bind(Topology.class).to(TopologyFromFile.class);
-
-		/* Use a simple UDP comm layer implementation. */
-		bind(Listener.class).to(NettyListener.class);
-		bind(Emitter.class).to(NettyEmitter.class);
-
-		/* The hashing function to map keys top partitions. */
-		bind(Hasher.class).to(DefaultHasher.class);
-
-		/* Use Kryo to serialize events. */
-		bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-		
-        bind(Integer.class).annotatedWith(Names.named("emitter.send.interval"))
-        .toInstance(config.getInt("emitter.send.interval"));
-
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
index ba85899..17a2376 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
@@ -2,18 +2,18 @@ package org.apache.s4.comm;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
-
-import com.google.inject.Guice;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import com.google.inject.name.Named;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
 /*
- * Test class to test communication protocols.
+ * Test class to test communication protocols. As comm-layer connections need to be
+ *  made including acquiring locks, the test is declared abstract and needs to be 
+ *  extended with appropriate protocols.
  * 
+ * At a high-level, the test accomplishes the following:
  * <ul>
  * <li> Create Send and Receive Threads </li>
  * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
@@ -24,9 +24,10 @@ import junit.framework.TestCase;
  * </ul>
  * 
  */
-public class SimpleDeliveryTest extends TestCase {
+public abstract class SimpleDeliveryTest extends TestCase {
+	protected CommWrapper sdt;
 
-	class CommWrapper {
+	static class CommWrapper {
 		final private static int messageCount = 200;
 		final private static int timerThreadCount = 100;
 
@@ -140,54 +141,14 @@ public class SimpleDeliveryTest extends TestCase {
 	}
 
 	/**
-	 * test1() tests the UDP protocol. If all components function without
-	 * throwing exceptions, the test passes. As UDP doesn't guarantee message
-	 * delivery, the number of messages received doesn't come into play to
-	 * determine if it passes the test.
-	 * 
+	 * test() tests the protocol. If all components function without throwing
+	 * exceptions, the test passes. The test also reports the loss of messages,
+	 * if any.
 	 * 
 	 * @throws InterruptedException
 	 */
-	public void test1() throws InterruptedException {
-		System.out.println("Testing UDP");
-
-		Injector injector = Guice.createInjector(new UDPTestModule());
+	public void test() throws InterruptedException {
 		try {
-			CommWrapper sdt = injector.getInstance(CommWrapper.class);
-
-			// start send and receive threads
-			sdt.sendThread.start();
-			sdt.receiveThread.start();
-
-			// wait for them to finish
-			sdt.sendThread.join();
-			sdt.receiveThread.join();
-
-			// exit - system.exit is called here to revoke the lock file and
-			// listener
-			// sockets
-		} catch (Exception e) {
-			Assert.fail("UDP test has failed");
-		}
-		Assert.assertTrue("UDP test PASSED. Seems to work fine", true);
-
-		System.out.println("Done");
-	}
-
-	/**
-	 * test2() tests the Netty TCP protocol. If all components function without
-	 * throwing exceptions, the test passes partially. As TCP guarantees message
-	 * delivery, the test checks for that too.
-	 * 
-	 * @throws InterruptedException
-	 */
-	public void test2() throws InterruptedException {
-		System.out.println("Testing Netty TCP");
-
-		Injector injector = Guice.createInjector(new NettyTestModule());
-		try {
-			CommWrapper sdt = injector.getInstance(CommWrapper.class);
-
 			// start send and receive threads
 			sdt.sendThread.start();
 			sdt.receiveThread.start();
@@ -199,10 +160,12 @@ public class SimpleDeliveryTest extends TestCase {
 			Assert.assertTrue("Guaranteed message delivery",
 					!sdt.moreMessages());
 		} catch (Exception e) {
-			Assert.fail("Netty test has failed basic functionality test");
+			e.printStackTrace();
+			Assert.fail("The comm protocol has failed basic functionality test");
 		}
 
-		Assert.assertTrue("Netty seems to be working crash-free", true);
+		Assert.assertTrue("The comm protocol seems to be working crash-free",
+				true);
 
 		System.out.println("Done");
 	}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTest.java
new file mode 100644
index 0000000..720dc28
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTest.java
@@ -0,0 +1,89 @@
+package org.apache.s4.comm;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+public class UDPTest extends SimpleDeliveryTest {
+
+	@Override
+	protected void setUp() {
+		Injector injector = Guice.createInjector(new UDPTestModule());
+		sdt = injector.getInstance(CommWrapper.class);
+	}
+
+	class UDPTestModule extends AbstractModule {
+
+		protected PropertiesConfiguration config = null;
+
+		private void loadProperties(Binder binder) {
+
+			try {
+				InputStream is = this.getClass().getResourceAsStream(
+						"/s4-comm-test.properties");
+				config = new PropertiesConfiguration();
+				config.load(is);
+
+				System.out.println(ConfigurationUtils.toString(config));
+				Names.bindProperties(binder,
+						ConfigurationConverter.getProperties(config));
+			} catch (ConfigurationException e) {
+				binder.addError(e);
+				e.printStackTrace();
+			}
+		}
+
+		@Override
+		protected void configure() {
+			if (config == null)
+				loadProperties(binder());
+
+			int numHosts = config.getList("cluster.hosts").size();
+			boolean isCluster = numHosts > 1 ? true : false;
+			bind(Boolean.class).annotatedWith(Names.named("isCluster"))
+					.toInstance(Boolean.valueOf(isCluster));
+
+			bind(Cluster.class);
+
+			bind(Assignment.class).to(AssignmentFromFile.class);
+
+			bind(Topology.class).to(TopologyFromFile.class);
+
+			/* Use a simple UDP comm layer implementation. */
+			bind(Listener.class).to(UDPListener.class);
+			bind(Emitter.class).to(UDPEmitter.class);
+
+			/* The hashing function to map keys top partitions. */
+			bind(Hasher.class).to(DefaultHasher.class);
+
+			/* Use Kryo to serialize events. */
+			bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+			bind(Integer.class).annotatedWith(
+					Names.named("emitter.send.interval")).toInstance(
+					config.getInt("emitter.send.interval"));
+
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java
deleted file mode 100644
index b052e70..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/*
- * Module for s4-comm/tests
- */
-public class UDPTestModule extends AbstractModule {
-
-	protected PropertiesConfiguration config = null;
-
-	private void loadProperties(Binder binder) {
-
-		try {
-			InputStream is = this.getClass().getResourceAsStream(
-					"/s4-comm-test.properties");
-			config = new PropertiesConfiguration();
-			config.load(is);
-
-			System.out.println(ConfigurationUtils.toString(config));
-			Names.bindProperties(binder,
-					ConfigurationConverter.getProperties(config));
-		} catch (ConfigurationException e) {
-			binder.addError(e);
-			e.printStackTrace();
-		}
-	}
-
-	@Override
-	protected void configure() {
-		if (config == null)
-			loadProperties(binder());
-
-		int numHosts = config.getList("cluster.hosts").size();
-		boolean isCluster = numHosts > 1 ? true : false;
-		bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(
-				Boolean.valueOf(isCluster));
-
-		bind(Cluster.class);
-
-		bind(Assignment.class).to(AssignmentFromFile.class);
-
-		bind(Topology.class).to(TopologyFromFile.class);
-
-		/* Use a simple UDP comm layer implementation. */
-		bind(Listener.class).to(UDPListener.class);
-		bind(Emitter.class).to(UDPEmitter.class);
-
-		/* The hashing function to map keys top partitions. */
-		bind(Hasher.class).to(DefaultHasher.class);
-
-		/* Use Kryo to serialize events. */
-		bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
-		bind(Integer.class).annotatedWith(Names.named("emitter.send.interval"))
-				.toInstance(config.getInt("emitter.send.interval"));
-
-	}
-}
\ No newline at end of file