You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/14 12:31:21 UTC

[1/2] git commit: Javadoc cleanup + added some logging upon message sending failures - also removed "network glitch" test

Updated Branches:
  refs/heads/S4-75 384e2c3d6 -> cbded5a7e


Javadoc cleanup + added some logging upon message sending failures
- also removed "network glitch" test


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/cbded5a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/cbded5a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/cbded5a7

Branch: refs/heads/S4-75
Commit: cbded5a7e4a7b49fc32ddc60917e69b0e6344e28
Parents: 694188f
Author: Matthieu Morel <mm...@apache.org>
Authored: Sat Jul 14 13:24:03 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sat Jul 14 14:12:20 2012 +0200

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   62 +++++++-------
 .../org/apache/s4/comm/tcp/NetworkGlitchTest.java  |   38 ---------
 .../src/main/java/s4app/ConsumerPE.java            |    2 +-
 3 files changed, 32 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cbded5a7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index c60e483..4472910 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -40,32 +40,8 @@ import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
 /**
- * <p>
- * TCPEmitter - Uses TCP to send messages across partitions. It
- * <ul>
- * <li>guarantees message delivery</li>
- * <li>preserves pair-wise message ordering; might end up sending duplicates to
- * ensure the order</li>
- * <li>tolerates topology changes, partition re-mapping and network glitches</li>
- * </ul>
- * </p>
+ * TCPEmitter - Uses TCP to send messages across partitions.
  * 
- * <p>
- * TCPEmitter is designed as follows:
- * <ul>
- * <li>maintains per-partition queue of {@code Message}s</li>
- * <li> <code>send(p, m)</code> queues the message 'm' to partition 'p'</li>
- * <li>a thread-pool is used to send the messages asynchronously to the
- * appropriate partitions; send operations between a pair of partitions are
- * serialized</li>
- * <li>Each {@code Message} implements the {@link ChannelFutureListener} and
- * listens on the {@link ChannelFuture} corresponding to the send operation</li>
- * <li>On success, the message marks itself as sent; messages marked sent at the
- * head of the queue are removed</li>
- * <li>On failure of a message m, 'm' and all the messages queued after 'm' are
- * resent to preserve message ordering</li>
- * </ul>
- * </p>
  */
 
 public class TCPEmitter implements Emitter, ClusterChangeListener {
@@ -130,8 +106,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     @Inject
     private void init() {
-        this.topology.addListener(this);
         refreshCluster();
+        this.topology.addListener(this);
     }
 
     private boolean connectTo(Integer partitionId) {
@@ -175,7 +151,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         if (c == null)
             return;
 
-        c.write(buffer);
+        c.write(buffer).addListener(new MessageSendingListener(partitionId));
     }
 
     @Override
@@ -186,16 +162,16 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     protected void removeChannel(int partition) {
         Channel c = partitionChannelMap.remove(partition);
-        if (c == null)
+        if (c == null) {
             return;
-
+        }
         c.close().addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
                 if (future.isSuccess())
                     channels.remove(future.getChannel());
                 else
-                    logger.error("FAILED to close channel");
+                    logger.error("Failed to close channel");
             }
         });
     }
@@ -221,7 +197,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
             for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
                 Integer partition = clusterNode.getPartition();
                 if (partition == null) {
-                    logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
+                    logger.error("Illegal partition for clusterNode - " + clusterNode);
                     return;
                 }
 
@@ -256,4 +232,28 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
             }
         }
     }
+
+    class MessageSendingListener implements ChannelFutureListener {
+
+        int partitionId = -1;
+
+        public MessageSendingListener(int partitionId) {
+            super();
+            this.partitionId = partitionId;
+        }
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+            if (!future.isSuccess()) {
+                try {
+                    // TODO handle possible cluster reconfiguration between send and failure callback
+                    logger.warn("Failed to send message to node {} (according to current cluster information)",
+                            topology.getPhysicalCluster().getNodes().get(partitionId));
+                } catch (IndexOutOfBoundsException ignored) {
+                    // cluster was changed
+                }
+            }
+
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cbded5a7/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
deleted file mode 100644
index 50ae056..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.s4.comm.tcp;
-
-import java.io.IOException;
-
-import org.apache.s4.comm.util.PartitionInfo;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NetworkGlitchTest extends TCPCommTest {
-
-    private static Logger logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
-
-    public NetworkGlitchTest() throws IOException {
-        super(2);
-        logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
-    }
-
-    public void testDelivery() throws InterruptedException {
-        PartitionInfo util = partitions[0];
-
-        startThreads();
-
-        for (int i = 0; i < 4; i++) {
-            Thread.sleep(500);
-            logger.debug("Messages sent so far - {}", util.sendThread.sendCounts);
-            ((TCPEmitter) util.emitter).removeChannel(0);
-            logger.debug("Channel closed");
-        }
-
-        waitForThreads();
-
-        Assert.assertTrue("Message delivery", messageDelivery());
-
-        logger.info("Message ordering - " + messageOrdering());
-        Assert.assertTrue("Pairwise message ordering", messageOrdering());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cbded5a7/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
----------------------------------------------------------------------
diff --git a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
index bba8559..7c6f123 100644
--- a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
+++ b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
@@ -19,7 +19,7 @@ public class ConsumerPE extends ProcessingElement {
 
     public void onEvent(Event event) {
         eventCount++;
-        logger.info(
+        logger.trace(
                 "Received event with tick {} and time {} for event # {}",
                 new String[] { String.valueOf(event.get("tick", Long.class)), String.valueOf(event.getTime()),
                         String.valueOf(eventCount) });