You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC

[19/50] [abbrv] git commit: Maintain messageQueue for each partition in NettyEmitter to tolerate network glitches

Maintain messageQueue for each partition in NettyEmitter to tolerate network glitches


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/2f8051fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/2f8051fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/2f8051fe

Branch: refs/heads/piper
Commit: 2f8051fe8ad7525a3f752904d00249070f2a8796
Parents: 90f68b4
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Mon Nov 28 01:18:38 2011 -0500
Committer: Karthik Kambatla <kk...@cs.purdue.edu>
Committed: Mon Nov 28 01:18:38 2011 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/base/Emitter.java  |    9 +-
 .../java/org/apache/s4/comm/QueueingEmitter.java   |  174 +++++++-------
 .../apache/s4/comm/loopback/LoopBackEmitter.java   |    4 +-
 .../org/apache/s4/comm/netty/NettyEmitter.java     |  189 +++++++++++----
 .../s4/comm/topology/AssignmentFromFile.java       |   38 +--
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |    5 +-
 .../org/apache/s4/comm/SimpleDeliveryTest.java     |    2 +
 .../test/java/org/apache/s4/comm/TCPCommTest.java  |   84 +++----
 .../test/java/org/apache/s4/comm/UDPCommTest.java  |   84 +++----
 9 files changed, 331 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index 3e400a0..0570c14 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -1,6 +1,13 @@
 package org.apache.s4.base;
 
 public interface Emitter {
-    void send(int partitionId, byte[] message);
+	
+	/*
+	 * @param partitionId - destination partition
+	 * @param message - message payload that needs to be sent
+	 * @return - true - if message is sent across successfully
+	 *         - false - if send fails
+	 */
+    boolean send(int partitionId, byte[] message);
     int getPartitionCount();
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
index dd58ec1..089503d 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
@@ -1,6 +1,5 @@
 package org.apache.s4.comm;
 
-
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -10,91 +9,92 @@ import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
 public class QueueingEmitter implements Emitter, Runnable {
-    private Emitter emitter;
-    private BlockingQueue<MessageHolder> queue;
-    private long dropCount = 0;
-    private volatile Thread thread;
-    
-    @Inject
-    public QueueingEmitter(@Named("ll") Emitter emitter,
-            @Named("comm.queue_emmiter_size") int queueSize) {
-        this.emitter = emitter;
-        queue = new LinkedBlockingQueue<MessageHolder>(queueSize);       
-    }
- 
-    public long getDropCount() {
-        return dropCount;
-    }
-
-    public void start() {
-        if (thread != null) {
-            throw new IllegalStateException("QueueingEmitter is already started");
-        }
-        thread = new Thread(this, "QueueingEmitter");
-        thread.start();
-    }
-    
-    public void stop() {
-        if (thread == null) {
-            throw new IllegalStateException("QueueingEmitter is already stopped");
-        }
-        thread.interrupt();
-        thread = null;
-    }    
-    
-    @Override
-    public void send(int partitionId, byte[] message) {
-        MessageHolder mh = new MessageHolder(partitionId, message);
-        if (!queue.offer(mh)) {
-            dropCount++;
-            //System.out.println("QueueingEmitter: dropping message");
-        }
-        else {
-            //System.out.println("QueueingEmitter: Adding message to queue for other thread to pick up");
-        }
-    }
-    
-    public void run() {
-        while (!Thread.interrupted()) {
-            try {
-                MessageHolder mh = queue.take();
-                //System.out.println("QueueingEmitter: Sending message on low-level emitter");
-                emitter.send(mh.getPartitionId(), mh.getMessage());
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-    
-    public int getPartitionCount() {
-        return emitter.getPartitionCount();
-    }
-    
-    class MessageHolder {      
-        private int partitionId;
-        private byte[] message;
-
-        public int getPartitionId() {
-            return partitionId;
-        }
-
-        public void setPartitionId(int partitionId) {
-            this.partitionId = partitionId;
-        }
-
-        public byte[] getMessage() {
-            return message;
-        }
-
-        public void setMessage(byte[] message) {
-            this.message = message;
-        }
-
-        public MessageHolder(int partitionId, byte[] message) {
-            super();
-            this.partitionId = partitionId;
-            this.message = message;
-        }
-    }
+	private Emitter emitter;
+	private BlockingQueue<MessageHolder> queue;
+	private long dropCount = 0;
+	private volatile Thread thread;
+
+	@Inject
+	public QueueingEmitter(@Named("ll") Emitter emitter,
+			@Named("comm.queue_emmiter_size") int queueSize) {
+		this.emitter = emitter;
+		queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
+	}
+
+	public long getDropCount() {
+		return dropCount;
+	}
+
+	public void start() {
+		if (thread != null) {
+			throw new IllegalStateException(
+					"QueueingEmitter is already started");
+		}
+		thread = new Thread(this, "QueueingEmitter");
+		thread.start();
+	}
+
+	public void stop() {
+		if (thread == null) {
+			throw new IllegalStateException(
+					"QueueingEmitter is already stopped");
+		}
+		thread.interrupt();
+		thread = null;
+	}
+
+	@Override
+	public boolean send(int partitionId, byte[] message) {
+		MessageHolder mh = new MessageHolder(partitionId, message);
+		if (!queue.offer(mh)) {
+			dropCount++;
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	public void run() {
+		while (!Thread.interrupted()) {
+			try {
+				MessageHolder mh = queue.take();
+				// System.out.println("QueueingEmitter: Sending message on low-level emitter");
+				emitter.send(mh.getPartitionId(), mh.getMessage());
+			} catch (InterruptedException ie) {
+				Thread.currentThread().interrupt();
+			}
+		}
+	}
+
+	public int getPartitionCount() {
+		return emitter.getPartitionCount();
+	}
+
+	class MessageHolder {
+		private int partitionId;
+		private byte[] message;
+
+		public int getPartitionId() {
+			return partitionId;
+		}
+
+		public void setPartitionId(int partitionId) {
+			this.partitionId = partitionId;
+		}
+
+		public byte[] getMessage() {
+			return message;
+		}
+
+		public void setMessage(byte[] message) {
+			this.message = message;
+		}
+
+		public MessageHolder(int partitionId, byte[] message) {
+			super();
+			this.partitionId = partitionId;
+			this.message = message;
+		}
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
index bc3e2e3..d59eabe 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
@@ -10,9 +10,9 @@ public class LoopBackEmitter implements Emitter {
     }
     
     @Override
-    public void send(int partitionId, byte[] message) {
-        //System.out.println("LoopBackEmitter: Putting message to listener");
+    public boolean send(int partitionId, byte[] message) {
         listener.put(message);
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
index 8958018..3d95ef2 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
@@ -2,6 +2,9 @@ package org.apache.s4.comm.netty;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.Hashtable;
+import java.util.Queue;
 import java.util.concurrent.Executors;
 
 import org.apache.s4.base.Emitter;
@@ -32,13 +35,65 @@ import com.google.inject.Inject;
 
 public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
     private static final Logger logger = LoggerFactory.getLogger(NettyEmitter.class);
+    private static final int BUFFER_SIZE = 10;
+    private static final int NUM_RETRIES = 10;
 
     private Topology topology;
     private final ClientBootstrap bootstrap;
 
-    // Hashtable inherently allows capturing changes to the underlying topology
-    private HashBiMap<Integer, Channel> channels;
-    private HashBiMap<Integer, ClusterNode> nodes;
+    static class MessageQueuesPerPartition {
+        private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
+        private boolean bounded;
+
+        MessageQueuesPerPartition(boolean bounded) {
+            this.bounded = bounded;
+        }
+
+        private boolean add(int partitionId, byte[] message) {
+            Queue<byte[]> messages = queues.get(partitionId);
+
+            if (messages == null) {
+                messages = new ArrayDeque<byte[]>();
+                queues.put(partitionId, messages);
+            }
+
+            if (bounded && messages.size() >= BUFFER_SIZE) {
+                // Too many messages already queued
+                return false;
+            }
+
+            messages.offer(message);
+            return true;
+        }
+
+        private byte[] peek(int partitionId) {
+            Queue<byte[]> messages = queues.get(partitionId);
+
+            try {
+                return messages.peek();
+            } catch (NullPointerException npe) {
+                return null;
+            }
+        }
+
+        private void remove(int partitionId) {
+            Queue<byte[]> messages = queues.get(partitionId);
+
+            if (messages.isEmpty()) {
+                logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
+                return;
+            }
+
+            if (messages != null)
+                messages.remove();
+        }
+    }
+
+    private HashBiMap<Integer, Channel> partitionChannelMap;
+    private HashBiMap<Integer, ClusterNode> partitionNodeMap;
+    private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
+
+    // private MessageQueuesPerPartition messagesOnTheWire = new MessageQueuesPerPartition(false);
 
     @Inject
     public NettyEmitter(Topology topology) throws InterruptedException {
@@ -46,13 +101,8 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
         topology.addListener(this);
         int clusterSize = this.topology.getTopology().getNodes().size();
 
-        channels = HashBiMap.create(clusterSize);
-        nodes = HashBiMap.create(clusterSize);
-
-        for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
-            Integer partition = clusterNode.getPartition();
-            nodes.forcePut(partition, clusterNode);
-        }
+        partitionChannelMap = HashBiMap.create(clusterSize);
+        partitionNodeMap = HashBiMap.create(clusterSize);
 
         ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                 Executors.newCachedThreadPool());
@@ -73,22 +123,26 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
         bootstrap.setOption("keepAlive", true);
     }
 
-    private void connectTo(Integer partitionId) {
-        ClusterNode clusterNode = nodes.get(partitionId);
+    private boolean connectTo(Integer partitionId) {
+        ClusterNode clusterNode = partitionNodeMap.get(partitionId);
+
+        if (clusterNode == null) {
+            clusterNode = topology.getTopology().getNodes().get(partitionId);
+            partitionNodeMap.forcePut(partitionId, clusterNode);
+        }
 
         if (clusterNode == null) {
             logger.error("No ClusterNode exists for partitionId " + partitionId);
-            // clusterNode = topology.getTopology().getNodes().get(partitionId);
+            return false;
         }
 
-        logger.info(String.format("Connecting to %s:%d", clusterNode.getMachineName(), clusterNode.getPort()));
-        while (true) {
+        for (int retries = 0; retries < NUM_RETRIES; retries++) {
             ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(), clusterNode
                     .getPort()));
             f.awaitUninterruptibly();
             if (f.isSuccess()) {
-                channels.forcePut(partitionId, f.getChannel());
-                break;
+                partitionChannelMap.forcePut(partitionId, f.getChannel());
+                return true;
             }
             try {
                 Thread.sleep(10);
@@ -97,69 +151,100 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
                         clusterNode.getPort()));
             }
         }
+
+        return false;
+    }
+
+    private void writeMessageToChannel(Channel channel, int partitionId, byte[] message) {
+        // if (addToWire) {
+        // messagesOnTheWire.add(partitionId, message);
+        // }
+        ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
+        buffer.writeBytes(message);
+        ChannelFuture f = channel.write(buffer);
+        f.addListener(this);
     }
 
     private final Object sendLock = new Object();
 
     @Override
-    public void send(int partitionId, byte[] message) {
-        Channel channel = channels.get(partitionId);
-
-        while (channel == null) {
-            connectTo(partitionId);
-            channel = channels.get(partitionId); // making sure it is reflected in the map
+    public boolean send(int partitionId, byte[] message) {
+        Channel channel = partitionChannelMap.get(partitionId);
+        if (channel == null) {
+            if (connectTo(partitionId)) {
+                channel = partitionChannelMap.get(partitionId);
+            } else {
+                // could not connect, queue to the partitionBuffer
+                return queuedMessages.add(partitionId, message);
+            }
         }
 
-        ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
-
-        // check if Netty's send queue has gotten quite large
+        /*
+         * Try limiting the size of the send queue inside Netty
+         */
         if (!channel.isWritable()) {
             synchronized (sendLock) {
                 // check again now that we have the lock
                 while (!channel.isWritable()) {
                     try {
-                        sendLock.wait(); // wait until the channel's queue
-                                         // has gone down
+                        sendLock.wait();
                     } catch (InterruptedException ie) {
-                        return; // somebody wants us to stop running
+                        return false;
                     }
                 }
-                // logger.info("Woke up from send block!");
             }
         }
-        // between the above isWritable check and the below writeBytes, the
-        // isWritable
-        // may become false again. That's OK, we're just trying to avoid a
-        // very large
-        // above check to avoid creating a very large send queue inside
-        // Netty.
-        buffer.writeBytes(message);
-        ChannelFuture f = channel.write(buffer);
-        f.addListener(this);
 
+        /*
+         * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
+         * buffered messages, and (3) the Current Message
+         * 
+         * Once the channel returns success delete from the messagesOnTheWire
+         */
+        byte[] messageBeingSent = null;
+        // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
+        // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
+        // }
+
+        while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
+            writeMessageToChannel(channel, partitionId, messageBeingSent);
+            queuedMessages.remove(partitionId);
+        }
+
+        writeMessageToChannel(channel, partitionId, message);
+        return true;
     }
 
     @Override
     public void operationComplete(ChannelFuture f) {
-        // when we get here, the I/O operation associated with f is complete
+        int partitionId = partitionChannelMap.inverse().get(f.getChannel());
+        if (f.isSuccess()) {
+            // messagesOnTheWire.remove(partitionId);
+        }
+
         if (f.isCancelled()) {
             logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
         } else if (!f.isSuccess()) {
             logger.error("Exception on I/O operation", f.getCause());
-            // find the partition associated with this broken channel
-            int partition = channels.inverse().get(f.getChannel());
-            logger.error(String.format("I/O on partition %d failed!", partition));
+            logger.error(String.format("I/O on partition %d failed!", partitionId));
+            partitionChannelMap.remove(partitionId);
         }
     }
 
     @Override
     public void onChange() {
-        // topology changes when processes pick tasks
-        synchronized (nodes) {
-            for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
-                Integer partition = clusterNode.getPartition();
-                nodes.put(partition, clusterNode);
+        /*
+         * Close the channels that correspond to changed partitions and update partitionNodeMap
+         */
+        for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+            Integer partition = clusterNode.getPartition();
+            ClusterNode oldNode = partitionNodeMap.get(partition);
+
+            if (oldNode != null && !oldNode.equals(clusterNode)) {
+                partitionChannelMap.remove(partition).close();
             }
+
+            partitionNodeMap.forcePut(partition, clusterNode);
         }
     }
 
@@ -180,17 +265,16 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
                 }
             }
             ctx.sendUpstream(e);
-
         }
 
         @Override
         public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
-            Integer partition = channels.inverse().get(context.getChannel());
-            if (partition == null) {
+            Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
+            if (partitionId == null) {
                 logger.error("Error on mystery channel!!");
                 // return;
             }
-            logger.error("Error on channel to partition " + partition);
+            logger.error("Error on channel to partition " + partitionId);
 
             try {
                 throw event.getCause();
@@ -205,5 +289,4 @@ public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyCha
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
index a074089..d82a818 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
@@ -1,6 +1,5 @@
 package org.apache.s4.comm.topology;
 
-
 import java.io.File;
 import java.io.FileOutputStream;
 import java.net.InetAddress;
@@ -19,19 +18,19 @@ import com.google.inject.name.Named;
  * 
  */
 public class AssignmentFromFile implements Assignment {
-    private static final Logger logger = LoggerFactory
-            .getLogger(AssignmentFromFile.class);
+    private static final Logger logger = LoggerFactory.getLogger(AssignmentFromFile.class);
     final private Cluster cluster;
     final private String lockDir;
 
     @Inject
-    public AssignmentFromFile(Cluster cluster,
-            @Named("cluster.lock_dir") String lockDir) {
+    public AssignmentFromFile(Cluster cluster, @Named("cluster.lock_dir") String lockDir) {
         this.cluster = cluster;
         this.lockDir = lockDir;
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.comm.topology.Assignment#assignClusterNode()
      */
     @Override
@@ -43,8 +42,7 @@ public class AssignmentFromFile implements Assignment {
                     logger.info("Partition available: " + partitionAvailable);
                     if (partitionAvailable) {
                         boolean success = takeProcess(node);
-                        logger.info("Acquire partition:"
-                                + ((success) ? "success." : "failure."));
+                        logger.info("Acquire partition:" + ((success) ? "success." : "failure."));
                         if (success) {
                             return node;
                         }
@@ -55,9 +53,7 @@ public class AssignmentFromFile implements Assignment {
             } catch (Exception e) {
                 logger.error("Exception in assignPartition Method:", e);
             }
-
         }
-
     }
 
     private boolean takeProcess(ClusterNode node) {
@@ -70,20 +66,16 @@ public class AssignmentFromFile implements Assignment {
                 FileOutputStream fos = new FileOutputStream(lockFile);
                 FileLock fl = fos.getChannel().tryLock();
                 if (fl != null) {
-                    String message = "Partition acquired by PID:" + getPID()
-                            + " HOST:"
+                    String message = "Partition acquired by PID:" + getPID() + " HOST:"
                             + InetAddress.getLocalHost().getHostName();
                     fos.write(message.getBytes());
                     fos.close();
-                    logger.info(message + "  Lock File location: "
-                            + lockFile.getAbsolutePath());
+                    logger.info(message + "  Lock File location: " + lockFile.getAbsolutePath());
                     return true;
                 }
             }
         } catch (Exception e) {
-            logger.error(
-                    "Exception trying to take up partition:"
-                            + node.getPartition(), e);
+            logger.error("Exception trying to take up partition:" + node.getPartition(), e);
         } finally {
             if (lockFile != null) {
                 lockFile.deleteOnExit();
@@ -107,10 +99,8 @@ public class AssignmentFromFile implements Assignment {
 
     private boolean canTakeupProcess(ClusterNode node) {
         try {
-            InetAddress inetAddress = InetAddress.getByName(node
-                    .getMachineName());
-            logger.info("Host Name: "
-                    + InetAddress.getLocalHost().getCanonicalHostName());
+            InetAddress inetAddress = InetAddress.getByName(node.getMachineName());
+            logger.info("Host Name: " + InetAddress.getLocalHost().getCanonicalHostName());
             if (!node.getMachineName().equals("localhost")) {
                 if (!InetAddress.getLocalHost().equals(inetAddress)) {
                     return false;
@@ -125,15 +115,13 @@ public class AssignmentFromFile implements Assignment {
         if (!lockFile.exists()) {
             return true;
         } else {
-            logger.info("Partition taken up by another process lockFile:"
-                    + lockFileName);
+            logger.info("Partition taken up by another process lockFile:" + lockFileName);
         }
         return false;
     }
 
     private long getPID() {
-        String processName = java.lang.management.ManagementFactory
-                .getRuntimeMXBean().getName();
+        String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
         return Long.parseLong(processName.split("@")[0]);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index e365799..85b164c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -12,7 +12,6 @@ import org.apache.s4.base.Emitter;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.topology.Topology;
 import org.apache.s4.comm.topology.TopologyChangeListener;
-import org.jboss.netty.channel.Channel;
 
 import com.google.common.collect.HashBiMap;
 import com.google.inject.Inject;
@@ -45,7 +44,7 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
     }
 
     @Override
-    public void send(int partitionId, byte[] message) {
+    public boolean send(int partitionId, byte[] message) {
         try {
             ClusterNode node = nodes.get(partitionId);
             if (node == null) {
@@ -63,6 +62,8 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
+
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
index 6605eae..5423361 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
@@ -2,6 +2,7 @@ package org.apache.s4.comm;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -27,6 +28,7 @@ import com.google.inject.name.Named;
  */
 public abstract class SimpleDeliveryTest {
     protected CommWrapper sdt;
+    protected String lockdir;
 
     static class CommWrapper {
         private static final int MESSAGE_COUNT = 200;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
index a691cf6..b539932 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
@@ -27,62 +27,58 @@ import com.google.inject.name.Names;
 
 public class TCPCommTest extends SimpleDeliveryTest {
 
-	@Override
-	public void setup() {
-		Injector injector = Guice.createInjector(new NettyTestModule());
-		sdt = injector.getInstance(CommWrapper.class);
-	}
+    @Override
+    public void setup() {
+        Injector injector = Guice.createInjector(new NettyTestModule());
+        sdt = injector.getInstance(CommWrapper.class);
+    }
 
-	class NettyTestModule extends AbstractModule {
+    class NettyTestModule extends AbstractModule {
 
-		protected PropertiesConfiguration config = null;
+        protected PropertiesConfiguration config = null;
 
-		private void loadProperties(Binder binder) {
+        private void loadProperties(Binder binder) {
 
-			try {
-				InputStream is = this.getClass().getResourceAsStream(
-						"/s4-comm-test.properties");
-				config = new PropertiesConfiguration();
-				config.load(is);
+            try {
+                InputStream is = this.getClass().getResourceAsStream("/s4-comm-test.properties");
+                config = new PropertiesConfiguration();
+                config.load(is);
 
-				System.out.println(ConfigurationUtils.toString(config));
-				Names.bindProperties(binder,
-						ConfigurationConverter.getProperties(config));
-			} catch (ConfigurationException e) {
-				binder.addError(e);
-				e.printStackTrace();
-			}
-		}
+                System.out.println(ConfigurationUtils.toString(config));
+                Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+            } catch (ConfigurationException e) {
+                binder.addError(e);
+                e.printStackTrace();
+            }
+        }
 
-		@Override
-		protected void configure() {
-			if (config == null)
-				loadProperties(binder());
+        @Override
+        protected void configure() {
+            if (config == null)
+                loadProperties(binder());
 
-			int numHosts = config.getList("cluster.hosts").size();
-			boolean isCluster = numHosts > 1 ? true : false;
-			bind(Boolean.class).annotatedWith(Names.named("isCluster"))
-					.toInstance(Boolean.valueOf(isCluster));
+            int numHosts = config.getList("cluster.hosts").size();
+            boolean isCluster = numHosts > 1 ? true : false;
+            bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
 
-			bind(Cluster.class);
+            bind(Cluster.class);
 
-			bind(Assignment.class).to(AssignmentFromFile.class);
+            bind(Assignment.class).to(AssignmentFromFile.class);
 
-			bind(Topology.class).to(TopologyFromFile.class);
+            bind(Topology.class).to(TopologyFromFile.class);
 
-			/* Use a simple UDP comm layer implementation. */
-			bind(Listener.class).to(NettyListener.class);
-			bind(Emitter.class).to(NettyEmitter.class);
+            /* Use a simple UDP comm layer implementation. */
+            bind(Listener.class).to(NettyListener.class);
+            bind(Emitter.class).to(NettyEmitter.class);
 
-			/* The hashing function to map keys top partitions. */
-			bind(Hasher.class).to(DefaultHasher.class);
+            /* The hashing function to map keys top partitions. */
+            bind(Hasher.class).to(DefaultHasher.class);
 
-			/* Use Kryo to serialize events. */
-			bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+            /* Use Kryo to serialize events. */
+            bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 
-			bind(Integer.class).annotatedWith(
-					Names.named("emitter.send.interval")).toInstance(
-					config.getInt("emitter.send.interval"));
-		}
-	}
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(
+                    config.getInt("emitter.send.interval"));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f8051fe/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
index 6dceb19..9d08193 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
@@ -27,62 +27,58 @@ import com.google.inject.name.Names;
 
 public class UDPCommTest extends SimpleDeliveryTest {
 
-	@Override
-	public void setup() {
-		Injector injector = Guice.createInjector(new UDPCommTestModule());
-		sdt = injector.getInstance(CommWrapper.class);
-	}
+    @Override
+    public void setup() {
+        Injector injector = Guice.createInjector(new UDPCommTestModule());
+        sdt = injector.getInstance(CommWrapper.class);
+    }
 
-	class UDPCommTestModule extends AbstractModule {
+    class UDPCommTestModule extends AbstractModule {
 
-		protected PropertiesConfiguration config = null;
+        protected PropertiesConfiguration config = null;
 
-		private void loadProperties(Binder binder) {
+        private void loadProperties(Binder binder) {
 
-			try {
-				InputStream is = this.getClass().getResourceAsStream(
-						"/s4-comm-test.properties");
-				config = new PropertiesConfiguration();
-				config.load(is);
+            try {
+                InputStream is = this.getClass().getResourceAsStream("/s4-comm-test.properties");
+                config = new PropertiesConfiguration();
+                config.load(is);
 
-				System.out.println(ConfigurationUtils.toString(config));
-				Names.bindProperties(binder,
-						ConfigurationConverter.getProperties(config));
-			} catch (ConfigurationException e) {
-				binder.addError(e);
-				e.printStackTrace();
-			}
-		}
+                System.out.println(ConfigurationUtils.toString(config));
+                Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+            } catch (ConfigurationException e) {
+                binder.addError(e);
+                e.printStackTrace();
+            }
+        }
 
-		@Override
-		protected void configure() {
-			if (config == null)
-				loadProperties(binder());
+        @Override
+        protected void configure() {
+            if (config == null)
+                loadProperties(binder());
 
-			int numHosts = config.getList("cluster.hosts").size();
-			boolean isCluster = numHosts > 1 ? true : false;
-			bind(Boolean.class).annotatedWith(Names.named("isCluster"))
-					.toInstance(Boolean.valueOf(isCluster));
+            int numHosts = config.getList("cluster.hosts").size();
+            boolean isCluster = numHosts > 1 ? true : false;
+            bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
 
-			bind(Cluster.class);
+            bind(Cluster.class);
 
-			bind(Assignment.class).to(AssignmentFromFile.class);
+            bind(Assignment.class).to(AssignmentFromFile.class);
 
-			bind(Topology.class).to(TopologyFromFile.class);
+            bind(Topology.class).to(TopologyFromFile.class);
 
-			/* Use a simple UDP comm layer implementation. */
-			bind(Listener.class).to(UDPListener.class);
-			bind(Emitter.class).to(UDPEmitter.class);
+            /* Use a simple UDP comm layer implementation. */
+            bind(Listener.class).to(UDPListener.class);
+            bind(Emitter.class).to(UDPEmitter.class);
 
-			/* The hashing function to map keys top partitions. */
-			bind(Hasher.class).to(DefaultHasher.class);
+            /* The hashing function to map keys top partitions. */
+            bind(Hasher.class).to(DefaultHasher.class);
 
-			/* Use Kryo to serialize events. */
-			bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+            /* Use Kryo to serialize events. */
+            bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 
-			bind(Integer.class).annotatedWith(
-					Names.named("emitter.send.interval")).toInstance(
-					config.getInt("emitter.send.interval"));
-		}
-	}
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(
+                    config.getInt("emitter.send.interval"));
+        }
+    }
 }