You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC
[19/50] [abbrv] git commit: Maintain messageQueue for each partition
in NettyEmitter to tolerate network glitches
Maintain messageQueue for each partition in NettyEmitter to tolerate network glitches
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/2f8051fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/2f8051fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/2f8051fe
Branch: refs/heads/piper
Commit: 2f8051fe8ad7525a3f752904d00249070f2a8796
Parents: 90f68b4
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Mon Nov 28 01:18:38 2011 -0500
Committer: Karthik Kambatla <kk...@cs.purdue.edu>
Committed: Mon Nov 28 01:18:38 2011 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/base/Emitter.java | 9 +-
.../java/org/apache/s4/comm/QueueingEmitter.java | 174 +++++++-------
.../apache/s4/comm/loopback/LoopBackEmitter.java | 4 +-
.../org/apache/s4/comm/netty/NettyEmitter.java | 189 +++++++++++----
.../s4/comm/topology/AssignmentFromFile.java | 38 +--
.../java/org/apache/s4/comm/udp/UDPEmitter.java | 5 +-
.../org/apache/s4/comm/SimpleDeliveryTest.java | 2 +
.../test/java/org/apache/s4/comm/TCPCommTest.java | 84 +++----
.../test/java/org/apache/s4/comm/UDPCommTest.java | 84 +++----
9 files changed, 331 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index 3e400a0..0570c14 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -1,6 +1,13 @@
package org.apache.s4.base;
public interface Emitter {
- void send(int partitionId, byte[] message);
+
+ /*
+ * @param partitionId - destination partition
+ * @param message - message payload that needs to be sent
+ * @return - true - if message is sent across successfully
+ * - false - if send fails
+ */
+ boolean send(int partitionId, byte[] message);
int getPartitionCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
index dd58ec1..089503d 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
@@ -1,6 +1,5 @@
package org.apache.s4.comm;
-
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -10,91 +9,92 @@ import com.google.inject.Inject;
import com.google.inject.name.Named;
public class QueueingEmitter implements Emitter, Runnable {
- private Emitter emitter;
- private BlockingQueue<MessageHolder> queue;
- private long dropCount = 0;
- private volatile Thread thread;
-
- @Inject
- public QueueingEmitter(@Named("ll") Emitter emitter,
- @Named("comm.queue_emmiter_size") int queueSize) {
- this.emitter = emitter;
- queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
- }
-
- public long getDropCount() {
- return dropCount;
- }
-
- public void start() {
- if (thread != null) {
- throw new IllegalStateException("QueueingEmitter is already started");
- }
- thread = new Thread(this, "QueueingEmitter");
- thread.start();
- }
-
- public void stop() {
- if (thread == null) {
- throw new IllegalStateException("QueueingEmitter is already stopped");
- }
- thread.interrupt();
- thread = null;
- }
-
- @Override
- public void send(int partitionId, byte[] message) {
- MessageHolder mh = new MessageHolder(partitionId, message);
- if (!queue.offer(mh)) {
- dropCount++;
- //System.out.println("QueueingEmitter: dropping message");
- }
- else {
- //System.out.println("QueueingEmitter: Adding message to queue for other thread to pick up");
- }
- }
-
- public void run() {
- while (!Thread.interrupted()) {
- try {
- MessageHolder mh = queue.take();
- //System.out.println("QueueingEmitter: Sending message on low-level emitter");
- emitter.send(mh.getPartitionId(), mh.getMessage());
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public int getPartitionCount() {
- return emitter.getPartitionCount();
- }
-
- class MessageHolder {
- private int partitionId;
- private byte[] message;
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public byte[] getMessage() {
- return message;
- }
-
- public void setMessage(byte[] message) {
- this.message = message;
- }
-
- public MessageHolder(int partitionId, byte[] message) {
- super();
- this.partitionId = partitionId;
- this.message = message;
- }
- }
+ private Emitter emitter;
+ private BlockingQueue<MessageHolder> queue;
+ private long dropCount = 0;
+ private volatile Thread thread;
+
+ @Inject
+ public QueueingEmitter(@Named("ll") Emitter emitter,
+ @Named("comm.queue_emmiter_size") int queueSize) {
+ this.emitter = emitter;
+ queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
+ }
+
+ public long getDropCount() {
+ return dropCount;
+ }
+
+ public void start() {
+ if (thread != null) {
+ throw new IllegalStateException(
+ "QueueingEmitter is already started");
+ }
+ thread = new Thread(this, "QueueingEmitter");
+ thread.start();
+ }
+
+ public void stop() {
+ if (thread == null) {
+ throw new IllegalStateException(
+ "QueueingEmitter is already stopped");
+ }
+ thread.interrupt();
+ thread = null;
+ }
+
+ @Override
+ public boolean send(int partitionId, byte[] message) {
+ MessageHolder mh = new MessageHolder(partitionId, message);
+ if (!queue.offer(mh)) {
+ dropCount++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ MessageHolder mh = queue.take();
+ // System.out.println("QueueingEmitter: Sending message on low-level emitter");
+ emitter.send(mh.getPartitionId(), mh.getMessage());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public int getPartitionCount() {
+ return emitter.getPartitionCount();
+ }
+
+ class MessageHolder {
+ private int partitionId;
+ private byte[] message;
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public byte[] getMessage() {
+ return message;
+ }
+
+ public void setMessage(byte[] message) {
+ this.message = message;
+ }
+
+ public MessageHolder(int partitionId, byte[] message) {
+ super();
+ this.partitionId = partitionId;
+ this.message = message;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
index bc3e2e3..d59eabe 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
@@ -10,9 +10,9 @@ public class LoopBackEmitter implements Emitter {
}
@Override
- public void send(int partitionId, byte[] message) {
- //System.out.println("LoopBackEmitter: Putting message to listener");
+ public boolean send(int partitionId, byte[] message) {
listener.put(message);
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
index 8958018..3d95ef2 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
@@ -2,6 +2,9 @@ package org.apache.s4.comm.netty;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.Hashtable;
+import java.util.Queue;
import java.util.concurrent.Executors;
import org.apache.s4.base.Emitter;
@@ -32,13 +35,65 @@ import com.google.inject.Inject;
public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
private static final Logger logger = LoggerFactory.getLogger(NettyEmitter.class);
+ private static final int BUFFER_SIZE = 10;
+ private static final int NUM_RETRIES = 10;
private Topology topology;
private final ClientBootstrap bootstrap;
- // Hashtable inherently allows capturing changes to the underlying topology
- private HashBiMap<Integer, Channel> channels;
- private HashBiMap<Integer, ClusterNode> nodes;
+ static class MessageQueuesPerPartition {
+ private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
+ private boolean bounded;
+
+ MessageQueuesPerPartition(boolean bounded) {
+ this.bounded = bounded;
+ }
+
+ private boolean add(int partitionId, byte[] message) {
+ Queue<byte[]> messages = queues.get(partitionId);
+
+ if (messages == null) {
+ messages = new ArrayDeque<byte[]>();
+ queues.put(partitionId, messages);
+ }
+
+ if (bounded && messages.size() >= BUFFER_SIZE) {
+ // Too many messages already queued
+ return false;
+ }
+
+ messages.offer(message);
+ return true;
+ }
+
+ private byte[] peek(int partitionId) {
+ Queue<byte[]> messages = queues.get(partitionId);
+
+ try {
+ return messages.peek();
+ } catch (NullPointerException npe) {
+ return null;
+ }
+ }
+
+ private void remove(int partitionId) {
+ Queue<byte[]> messages = queues.get(partitionId);
+
+ if (messages.isEmpty()) {
+ logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
+ return;
+ }
+
+ if (messages != null)
+ messages.remove();
+ }
+ }
+
+ private HashBiMap<Integer, Channel> partitionChannelMap;
+ private HashBiMap<Integer, ClusterNode> partitionNodeMap;
+ private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
+
+ // private MessageQueuesPerPartition messagesOnTheWire = new MessageQueuesPerPartition(false);
@Inject
public NettyEmitter(Topology topology) throws InterruptedException {
@@ -46,13 +101,8 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
topology.addListener(this);
int clusterSize = this.topology.getTopology().getNodes().size();
- channels = HashBiMap.create(clusterSize);
- nodes = HashBiMap.create(clusterSize);
-
- for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
- Integer partition = clusterNode.getPartition();
- nodes.forcePut(partition, clusterNode);
- }
+ partitionChannelMap = HashBiMap.create(clusterSize);
+ partitionNodeMap = HashBiMap.create(clusterSize);
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
@@ -73,22 +123,26 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
bootstrap.setOption("keepAlive", true);
}
- private void connectTo(Integer partitionId) {
- ClusterNode clusterNode = nodes.get(partitionId);
+ private boolean connectTo(Integer partitionId) {
+ ClusterNode clusterNode = partitionNodeMap.get(partitionId);
+
+ if (clusterNode == null) {
+ clusterNode = topology.getTopology().getNodes().get(partitionId);
+ partitionNodeMap.forcePut(partitionId, clusterNode);
+ }
if (clusterNode == null) {
logger.error("No ClusterNode exists for partitionId " + partitionId);
- // clusterNode = topology.getTopology().getNodes().get(partitionId);
+ return false;
}
- logger.info(String.format("Connecting to %s:%d", clusterNode.getMachineName(), clusterNode.getPort()));
- while (true) {
+ for (int retries = 0; retries < NUM_RETRIES; retries++) {
ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(), clusterNode
.getPort()));
f.awaitUninterruptibly();
if (f.isSuccess()) {
- channels.forcePut(partitionId, f.getChannel());
- break;
+ partitionChannelMap.forcePut(partitionId, f.getChannel());
+ return true;
}
try {
Thread.sleep(10);
@@ -97,69 +151,100 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
clusterNode.getPort()));
}
}
+
+ return false;
+ }
+
+ private void writeMessageToChannel(Channel channel, int partitionId, byte[] message) {
+ // if (addToWire) {
+ // messagesOnTheWire.add(partitionId, message);
+ // }
+ ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
+ buffer.writeBytes(message);
+ ChannelFuture f = channel.write(buffer);
+ f.addListener(this);
}
private final Object sendLock = new Object();
@Override
- public void send(int partitionId, byte[] message) {
- Channel channel = channels.get(partitionId);
-
- while (channel == null) {
- connectTo(partitionId);
- channel = channels.get(partitionId); // making sure it is reflected in the map
+ public boolean send(int partitionId, byte[] message) {
+ Channel channel = partitionChannelMap.get(partitionId);
+ if (channel == null) {
+ if (connectTo(partitionId)) {
+ channel = partitionChannelMap.get(partitionId);
+ } else {
+ // could not connect, queue to the partitionBuffer
+ return queuedMessages.add(partitionId, message);
+ }
}
- ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
-
- // check if Netty's send queue has gotten quite large
+ /*
+ * Try limiting the size of the send queue inside Netty
+ */
if (!channel.isWritable()) {
synchronized (sendLock) {
// check again now that we have the lock
while (!channel.isWritable()) {
try {
- sendLock.wait(); // wait until the channel's queue
- // has gone down
+ sendLock.wait();
} catch (InterruptedException ie) {
- return; // somebody wants us to stop running
+ return false;
}
}
- // logger.info("Woke up from send block!");
}
}
- // between the above isWritable check and the below writeBytes, the
- // isWritable
- // may become false again. That's OK, we're just trying to avoid a
- // very large
- // above check to avoid creating a very large send queue inside
- // Netty.
- buffer.writeBytes(message);
- ChannelFuture f = channel.write(buffer);
- f.addListener(this);
+ /*
+ * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
+ * buffered messages, and (3) the Current Message
+ *
+ * Once the channel returns success delete from the messagesOnTheWire
+ */
+ byte[] messageBeingSent = null;
+ // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
+ // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
+ // }
+
+ while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
+ writeMessageToChannel(channel, partitionId, messageBeingSent);
+ queuedMessages.remove(partitionId);
+ }
+
+ writeMessageToChannel(channel, partitionId, message);
+ return true;
}
@Override
public void operationComplete(ChannelFuture f) {
- // when we get here, the I/O operation associated with f is complete
+ int partitionId = partitionChannelMap.inverse().get(f.getChannel());
+ if (f.isSuccess()) {
+ // messagesOnTheWire.remove(partitionId);
+ }
+
if (f.isCancelled()) {
logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
} else if (!f.isSuccess()) {
logger.error("Exception on I/O operation", f.getCause());
- // find the partition associated with this broken channel
- int partition = channels.inverse().get(f.getChannel());
- logger.error(String.format("I/O on partition %d failed!", partition));
+ logger.error(String.format("I/O on partition %d failed!", partitionId));
+ partitionChannelMap.remove(partitionId);
}
}
@Override
public void onChange() {
- // topology changes when processes pick tasks
- synchronized (nodes) {
- for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
- Integer partition = clusterNode.getPartition();
- nodes.put(partition, clusterNode);
+ /*
+ * Close the channels that correspond to changed partitions and update partitionNodeMap
+ */
+ for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+ Integer partition = clusterNode.getPartition();
+ ClusterNode oldNode = partitionNodeMap.get(partition);
+
+ if (oldNode != null && !oldNode.equals(clusterNode)) {
+ partitionChannelMap.remove(partition).close();
}
+
+ partitionNodeMap.forcePut(partition, clusterNode);
}
}
@@ -180,17 +265,16 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
}
}
ctx.sendUpstream(e);
-
}
@Override
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
- Integer partition = channels.inverse().get(context.getChannel());
- if (partition == null) {
+ Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
+ if (partitionId == null) {
logger.error("Error on mystery channel!!");
// return;
}
- logger.error("Error on channel to partition " + partition);
+ logger.error("Error on channel to partition " + partitionId);
try {
throw event.getCause();
@@ -205,5 +289,4 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
index a074089..d82a818 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
@@ -1,6 +1,5 @@
package org.apache.s4.comm.topology;
-
import java.io.File;
import java.io.FileOutputStream;
import java.net.InetAddress;
@@ -19,19 +18,19 @@ import com.google.inject.name.Named;
*
*/
public class AssignmentFromFile implements Assignment {
- private static final Logger logger = LoggerFactory
- .getLogger(AssignmentFromFile.class);
+ private static final Logger logger = LoggerFactory.getLogger(AssignmentFromFile.class);
final private Cluster cluster;
final private String lockDir;
@Inject
- public AssignmentFromFile(Cluster cluster,
- @Named("cluster.lock_dir") String lockDir) {
+ public AssignmentFromFile(Cluster cluster, @Named("cluster.lock_dir") String lockDir) {
this.cluster = cluster;
this.lockDir = lockDir;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.comm.topology.Assignment#assignClusterNode()
*/
@Override
@@ -43,8 +42,7 @@ public class AssignmentFromFile implements Assignment {
logger.info("Partition available: " + partitionAvailable);
if (partitionAvailable) {
boolean success = takeProcess(node);
- logger.info("Acquire partition:"
- + ((success) ? "success." : "failure."));
+ logger.info("Acquire partition:" + ((success) ? "success." : "failure."));
if (success) {
return node;
}
@@ -55,9 +53,7 @@ public class AssignmentFromFile implements Assignment {
} catch (Exception e) {
logger.error("Exception in assignPartition Method:", e);
}
-
}
-
}
private boolean takeProcess(ClusterNode node) {
@@ -70,20 +66,16 @@ public class AssignmentFromFile implements Assignment {
FileOutputStream fos = new FileOutputStream(lockFile);
FileLock fl = fos.getChannel().tryLock();
if (fl != null) {
- String message = "Partition acquired by PID:" + getPID()
- + " HOST:"
+ String message = "Partition acquired by PID:" + getPID() + " HOST:"
+ InetAddress.getLocalHost().getHostName();
fos.write(message.getBytes());
fos.close();
- logger.info(message + " Lock File location: "
- + lockFile.getAbsolutePath());
+ logger.info(message + " Lock File location: " + lockFile.getAbsolutePath());
return true;
}
}
} catch (Exception e) {
- logger.error(
- "Exception trying to take up partition:"
- + node.getPartition(), e);
+ logger.error("Exception trying to take up partition:" + node.getPartition(), e);
} finally {
if (lockFile != null) {
lockFile.deleteOnExit();
@@ -107,10 +99,8 @@ public class AssignmentFromFile implements Assignment {
private boolean canTakeupProcess(ClusterNode node) {
try {
- InetAddress inetAddress = InetAddress.getByName(node
- .getMachineName());
- logger.info("Host Name: "
- + InetAddress.getLocalHost().getCanonicalHostName());
+ InetAddress inetAddress = InetAddress.getByName(node.getMachineName());
+ logger.info("Host Name: " + InetAddress.getLocalHost().getCanonicalHostName());
if (!node.getMachineName().equals("localhost")) {
if (!InetAddress.getLocalHost().equals(inetAddress)) {
return false;
@@ -125,15 +115,13 @@ public class AssignmentFromFile implements Assignment {
if (!lockFile.exists()) {
return true;
} else {
- logger.info("Partition taken up by another process lockFile:"
- + lockFileName);
+ logger.info("Partition taken up by another process lockFile:" + lockFileName);
}
return false;
}
private long getPID() {
- String processName = java.lang.management.ManagementFactory
- .getRuntimeMXBean().getName();
+ String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
return Long.parseLong(processName.split("@")[0]);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index e365799..85b164c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -12,7 +12,6 @@ import org.apache.s4.base.Emitter;
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.topology.Topology;
import org.apache.s4.comm.topology.TopologyChangeListener;
-import org.jboss.netty.channel.Channel;
import com.google.common.collect.HashBiMap;
import com.google.inject.Inject;
@@ -45,7 +44,7 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
}
@Override
- public void send(int partitionId, byte[] message) {
+ public boolean send(int partitionId, byte[] message) {
try {
ClusterNode node = nodes.get(partitionId);
if (node == null) {
@@ -63,6 +62,8 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
} catch (IOException e) {
throw new RuntimeException(e);
}
+
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
index 6605eae..5423361 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
@@ -2,6 +2,7 @@ package org.apache.s4.comm;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Listener;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -27,6 +28,7 @@ import com.google.inject.name.Named;
*/
public abstract class SimpleDeliveryTest {
protected CommWrapper sdt;
+ protected String lockdir;
static class CommWrapper {
private static final int MESSAGE_COUNT = 200;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
index a691cf6..b539932 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
@@ -27,62 +27,58 @@ import com.google.inject.name.Names;
public class TCPCommTest extends SimpleDeliveryTest {
- @Override
- public void setup() {
- Injector injector = Guice.createInjector(new NettyTestModule());
- sdt = injector.getInstance(CommWrapper.class);
- }
+ @Override
+ public void setup() {
+ Injector injector = Guice.createInjector(new NettyTestModule());
+ sdt = injector.getInstance(CommWrapper.class);
+ }
- class NettyTestModule extends AbstractModule {
+ class NettyTestModule extends AbstractModule {
- protected PropertiesConfiguration config = null;
+ protected PropertiesConfiguration config = null;
- private void loadProperties(Binder binder) {
+ private void loadProperties(Binder binder) {
- try {
- InputStream is = this.getClass().getResourceAsStream(
- "/s4-comm-test.properties");
- config = new PropertiesConfiguration();
- config.load(is);
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/s4-comm-test.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
- System.out.println(ConfigurationUtils.toString(config));
- Names.bindProperties(binder,
- ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
+ System.out.println(ConfigurationUtils.toString(config));
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
- @Override
- protected void configure() {
- if (config == null)
- loadProperties(binder());
+ @Override
+ protected void configure() {
+ if (config == null)
+ loadProperties(binder());
- int numHosts = config.getList("cluster.hosts").size();
- boolean isCluster = numHosts > 1 ? true : false;
- bind(Boolean.class).annotatedWith(Names.named("isCluster"))
- .toInstance(Boolean.valueOf(isCluster));
+ int numHosts = config.getList("cluster.hosts").size();
+ boolean isCluster = numHosts > 1 ? true : false;
+ bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
- bind(Cluster.class);
+ bind(Cluster.class);
- bind(Assignment.class).to(AssignmentFromFile.class);
+ bind(Assignment.class).to(AssignmentFromFile.class);
- bind(Topology.class).to(TopologyFromFile.class);
+ bind(Topology.class).to(TopologyFromFile.class);
- /* Use a simple UDP comm layer implementation. */
- bind(Listener.class).to(NettyListener.class);
- bind(Emitter.class).to(NettyEmitter.class);
+ /* Use a simple UDP comm layer implementation. */
+ bind(Listener.class).to(NettyListener.class);
+ bind(Emitter.class).to(NettyEmitter.class);
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- bind(Integer.class).annotatedWith(
- Names.named("emitter.send.interval")).toInstance(
- config.getInt("emitter.send.interval"));
- }
- }
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(
+ config.getInt("emitter.send.interval"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
index 6dceb19..9d08193 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
@@ -27,62 +27,58 @@ import com.google.inject.name.Names;
public class UDPCommTest extends SimpleDeliveryTest {
- @Override
- public void setup() {
- Injector injector = Guice.createInjector(new UDPCommTestModule());
- sdt = injector.getInstance(CommWrapper.class);
- }
+ @Override
+ public void setup() {
+ Injector injector = Guice.createInjector(new UDPCommTestModule());
+ sdt = injector.getInstance(CommWrapper.class);
+ }
- class UDPCommTestModule extends AbstractModule {
+ class UDPCommTestModule extends AbstractModule {
- protected PropertiesConfiguration config = null;
+ protected PropertiesConfiguration config = null;
- private void loadProperties(Binder binder) {
+ private void loadProperties(Binder binder) {
- try {
- InputStream is = this.getClass().getResourceAsStream(
- "/s4-comm-test.properties");
- config = new PropertiesConfiguration();
- config.load(is);
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/s4-comm-test.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
- System.out.println(ConfigurationUtils.toString(config));
- Names.bindProperties(binder,
- ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
+ System.out.println(ConfigurationUtils.toString(config));
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
- @Override
- protected void configure() {
- if (config == null)
- loadProperties(binder());
+ @Override
+ protected void configure() {
+ if (config == null)
+ loadProperties(binder());
- int numHosts = config.getList("cluster.hosts").size();
- boolean isCluster = numHosts > 1 ? true : false;
- bind(Boolean.class).annotatedWith(Names.named("isCluster"))
- .toInstance(Boolean.valueOf(isCluster));
+ int numHosts = config.getList("cluster.hosts").size();
+ boolean isCluster = numHosts > 1 ? true : false;
+ bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
- bind(Cluster.class);
+ bind(Cluster.class);
- bind(Assignment.class).to(AssignmentFromFile.class);
+ bind(Assignment.class).to(AssignmentFromFile.class);
- bind(Topology.class).to(TopologyFromFile.class);
+ bind(Topology.class).to(TopologyFromFile.class);
- /* Use a simple UDP comm layer implementation. */
- bind(Listener.class).to(UDPListener.class);
- bind(Emitter.class).to(UDPEmitter.class);
+ /* Use a simple UDP comm layer implementation. */
+ bind(Listener.class).to(UDPListener.class);
+ bind(Emitter.class).to(UDPEmitter.class);
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- bind(Integer.class).annotatedWith(
- Names.named("emitter.send.interval")).toInstance(
- config.getInt("emitter.send.interval"));
- }
- }
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(
+ config.getInt("emitter.send.interval"));
+ }
+ }
}