You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC
[26/50] [abbrv] git commit: junit 4 + code formatting
junit 4 + code formatting
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/df927d23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/df927d23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/df927d23
Branch: refs/heads/piper
Commit: df927d23a8a0d9e76da18378e2599aea5ce5fcf6
Parents: bba15f2
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Nov 10 20:20:27 2011 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Nov 10 20:20:27 2011 +0100
----------------------------------------------------------------------
.../org/apache/s4/comm/SimpleDeliveryTest.java | 296 +++++++--------
.../s4/comm/topology/AssignmentFromZKTest.java | 9 +-
.../s4/comm/topology/TopologyFromZKTest.java | 16 +-
.../org/apache/s4/comm/topology/ZKBaseTest.java | 25 +-
.../s4/comm/topology/ZNRecordSerializerTest.java | 7 +-
5 files changed, 170 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df927d23/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
index f80d287..6605eae 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
@@ -26,157 +26,147 @@ import com.google.inject.name.Named;
*
*/
public abstract class SimpleDeliveryTest {
- protected CommWrapper sdt;
-
- static class CommWrapper {
- final private static int messageCount = 200;
- final private static int timerThreadCount = 100;
-
- final private Emitter emitter;
- final private Listener listener;
- final private int interval;
-
- public Thread sendThread, receiveThread;
- private int messagesExpected;
- private int messagesReceived = 0;
-
- @Inject
- public CommWrapper(@Named("emitter.send.interval") int interval,
- Emitter emitter, Listener listener) {
- this.emitter = emitter;
- this.listener = listener;
- this.interval = interval;
- this.messagesExpected = messageCount
- * this.emitter.getPartitionCount();
-
- this.sendThread = new SendThread();
- this.receiveThread = new ReceiveThread();
- }
-
- public boolean moreMessages() {
- return ((messagesExpected - messagesReceived) > 0);
- }
-
- class SendThread extends Thread {
- @Override
- public void run() {
- try {
- for (int partition = 0; partition < emitter
- .getPartitionCount(); partition++) {
- for (int i = 0; i < messageCount; i++) {
- byte[] message = (new String("message-" + i))
- .getBytes();
- emitter.send(partition, message);
- Thread.sleep(interval);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- return;
- }
- }
- }
-
- /*
- * TimerThread - interrupts the passed thread, after specified
- * time-interval.
- */
- class TimerThread extends Thread {
- private Thread watchThread;
- private Integer sleepCounter;
-
- TimerThread(Thread watchThread) {
- this.watchThread = watchThread;
- this.sleepCounter = new Integer(timerThreadCount);
- }
-
- public void resetSleepCounter() {
- synchronized (this.sleepCounter) {
- this.sleepCounter = timerThreadCount;
- }
- }
-
- public void clearSleepCounter() {
- synchronized (this.sleepCounter) {
- this.sleepCounter = 0;
- }
- }
-
- private int getCounter() {
- synchronized (this.sleepCounter) {
- return this.sleepCounter--;
- }
- }
-
- @Override
- public void run() {
- try {
- while (getCounter() > 0) {
- Thread.sleep(interval);
- }
- watchThread.interrupt();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- class ReceiveThread extends Thread {
- @Override
- public void run() {
-
- // start the timer thread to interrupt if blocked for too long
- TimerThread timer = new TimerThread(this);
- timer.start();
- while (messagesReceived < messagesExpected) {
- byte[] message = listener.recv();
- timer.resetSleepCounter();
- if (message != null)
- messagesReceived++;
- else
- break;
- }
- timer.clearSleepCounter();
- }
- }
- }
-
-
-
- /*
- * All tests extending this class need to implement this method
- */
- @Before
- public abstract void setup();
-
- /**
- * Tests the protocol. If all components function without throwing
- * exceptions, the test passes. The test also reports the loss of messages,
- * if any.
- *
- * @throws InterruptedException
- */
- @Test
- public void testCommLayerProtocol() throws InterruptedException {
- try {
- // start send and receive threads
- sdt.sendThread.start();
- sdt.receiveThread.start();
-
- // wait for them to finish
- sdt.sendThread.join();
- sdt.receiveThread.join();
-
- Assert.assertTrue("Guaranteed message delivery",
- !sdt.moreMessages());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The comm protocol has failed basic functionality test");
- }
-
- Assert.assertTrue("The comm protocol seems to be working crash-free",
- true);
-
- System.out.println("Done");
- }
+ protected CommWrapper sdt;
+
+ static class CommWrapper {
+ private static final int MESSAGE_COUNT = 200;
+ private static final int TIMER_THREAD_COUNT = 100;
+
+ private final Emitter emitter;
+ private final Listener listener;
+ private final int interval;
+
+ public Thread sendThread, receiveThread;
+ private final int messagesExpected;
+ private int messagesReceived = 0;
+
+ @Inject
+ public CommWrapper(@Named("emitter.send.interval") int interval, Emitter emitter, Listener listener) {
+ this.emitter = emitter;
+ this.listener = listener;
+ this.interval = interval;
+ this.messagesExpected = MESSAGE_COUNT * this.emitter.getPartitionCount();
+
+ this.sendThread = new SendThread();
+ this.receiveThread = new ReceiveThread();
+ }
+
+ public boolean moreMessages() {
+ return ((messagesExpected - messagesReceived) > 0);
+ }
+
+ class SendThread extends Thread {
+ @Override
+ public void run() {
+ try {
+ for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ byte[] message = (new String("message-" + i)).getBytes();
+ emitter.send(partition, message);
+ Thread.sleep(interval);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ }
+
+ /*
+ * TimerThread - interrupts the passed thread, after specified time-interval.
+ */
+ class TimerThread extends Thread {
+ private final Thread watchThread;
+ private Integer sleepCounter;
+
+ TimerThread(Thread watchThread) {
+ this.watchThread = watchThread;
+ this.sleepCounter = new Integer(TIMER_THREAD_COUNT);
+ }
+
+ public void resetSleepCounter() {
+ synchronized (this.sleepCounter) {
+ this.sleepCounter = TIMER_THREAD_COUNT;
+ }
+ }
+
+ public void clearSleepCounter() {
+ synchronized (this.sleepCounter) {
+ this.sleepCounter = 0;
+ }
+ }
+
+ private int getCounter() {
+ synchronized (this.sleepCounter) {
+ return this.sleepCounter--;
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (getCounter() > 0) {
+ Thread.sleep(interval);
+ }
+ watchThread.interrupt();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ class ReceiveThread extends Thread {
+ @Override
+ public void run() {
+
+ // start the timer thread to interrupt if blocked for too long
+ TimerThread timer = new TimerThread(this);
+ timer.start();
+ while (messagesReceived < messagesExpected) {
+ byte[] message = listener.recv();
+ timer.resetSleepCounter();
+ if (message != null)
+ messagesReceived++;
+ else
+ break;
+ }
+ timer.clearSleepCounter();
+ }
+ }
+ }
+
+ /*
+ * All tests extending this class need to implement this method
+ */
+ @Before
+ public abstract void setup();
+
+ /**
+ * Tests the protocol. If all components function without throwing exceptions, the test passes. The test also
+ * reports the loss of messages, if any.
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testCommLayerProtocol() throws InterruptedException {
+ try {
+ // start send and receive threads
+ sdt.sendThread.start();
+ sdt.receiveThread.start();
+
+ // wait for them to finish
+ sdt.sendThread.join();
+ sdt.receiveThread.join();
+
+ Assert.assertTrue("Guaranteed message delivery", !sdt.moreMessages());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The comm protocol has failed basic functionality test");
+ }
+
+ Assert.assertTrue("The comm protocol seems to be working crash-free", true);
+
+ System.out.println("Done");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df927d23/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
index f7ffbdb..a82a490 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
@@ -2,11 +2,14 @@ package org.apache.s4.comm.topology;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.*;
import org.apache.s4.comm.tools.TaskSetup;
+import org.junit.Test;
public class AssignmentFromZKTest extends ZKBaseTest {
+ @Test
public void testAssignment() throws Exception {
TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
final String clusterName = "test-s4-cluster";
@@ -20,10 +23,8 @@ public class AssignmentFromZKTest extends ZKBaseTest {
public void run() {
AssignmentFromZK assignmentFromZK;
try {
- assignmentFromZK = new AssignmentFromZK(clusterName,
- zookeeperAddress, 30000, 30000);
- ClusterNode assignClusterNode = assignmentFromZK
- .assignClusterNode();
+ assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
+ ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df927d23/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
index 5a23487..6c95784 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
@@ -2,31 +2,31 @@ package org.apache.s4.comm.topology;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.junit.Assert.*;
import org.apache.s4.comm.tools.TaskSetup;
+import org.junit.Test;
public class TopologyFromZKTest extends ZKBaseTest {
+ @Test
public void testAssignment() throws Exception {
TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
final String clusterName = "test-s4-cluster";
taskSetup.clean(clusterName);
taskSetup.setup(clusterName, 10);
- final TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName,
- zookeeperAddress, 30000, 30000);
+ final TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
final Lock lock = new ReentrantLock();
final Condition signal = lock.newCondition();
TopologyChangeListener listener = new TopologyChangeListener() {
@Override
public void onChange() {
- System.out
- .println("TopologyFromZKTest.testAssignment().new TopologyChangeListener() {...}.onChange()");
+ System.out.println("TopologyFromZKTest.testAssignment().new TopologyChangeListener() {...}.onChange()");
if (topologyFromZK.getTopology().getNodes().size() == 10) {
lock.lock();
try {
@@ -48,10 +48,8 @@ public class TopologyFromZKTest extends ZKBaseTest {
public void run() {
AssignmentFromZK assignmentFromZK;
try {
- assignmentFromZK = new AssignmentFromZK(clusterName,
- zookeeperAddress, 30000, 30000);
- ClusterNode assignClusterNode = assignmentFromZK
- .assignClusterNode();
+ assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
+ ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df927d23/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
index b80d0b5..54ce6a9 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
@@ -2,24 +2,23 @@ package org.apache.s4.comm.topology;
import java.io.File;
-import junit.framework.TestCase;
-
import org.I0Itec.zkclient.IDefaultNameSpace;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkServer;
+import org.junit.After;
+import org.junit.Before;
-public class ZKBaseTest extends TestCase {
+public class ZKBaseTest {
protected ZkServer zkServer = null;
protected ZkClient zkClient;
protected String zookeeperAddress;
- @Override
+ @Before
public void setUp() {
- String dataDir = System.getProperty("user.dir") + File.separator
- + "tmp" + File.separator + "zookeeper" + File.separator
- + "data";
- String logDir = System.getProperty("user.dir") + File.separator + "tmp"
- + File.separator + "zookeeper" + File.separator + "logs";
+ String dataDir = System.getProperty("user.dir") + File.separator + "tmp" + File.separator + "zookeeper"
+ + File.separator + "data";
+ String logDir = System.getProperty("user.dir") + File.separator + "tmp" + File.separator + "zookeeper"
+ + File.separator + "logs";
IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
@Override
@@ -34,11 +33,9 @@ public class ZKBaseTest extends TestCase {
zookeeperAddress = "localhost:" + port;
}
- public void test(){
-
- }
- @Override
- protected void tearDown() throws Exception {
+
+ @After
+ public void tearDown() throws Exception {
if (zkServer != null) {
zkServer.shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/df927d23/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZNRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZNRecordSerializerTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZNRecordSerializerTest.java
index 75ba674..0789a6a 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZNRecordSerializerTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZNRecordSerializerTest.java
@@ -1,10 +1,11 @@
package org.apache.s4.comm.topology;
-import junit.framework.Assert;
-import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Test;
-public class ZNRecordSerializerTest extends TestCase {
+public class ZNRecordSerializerTest {
+ @Test
public void testSerDeser() {
ZNRecordSerializer serializer = new ZNRecordSerializer();