You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/14 12:31:21 UTC
[1/2] git commit: Javadoc cleanup + added some logging upon message
sending failures - also removed "network glitch" test
Updated Branches:
refs/heads/S4-75 384e2c3d6 -> cbded5a7e
Javadoc cleanup + added some logging upon message sending failures
- also removed "network glitch" test
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/cbded5a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/cbded5a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/cbded5a7
Branch: refs/heads/S4-75
Commit: cbded5a7e4a7b49fc32ddc60917e69b0e6344e28
Parents: 694188f
Author: Matthieu Morel <mm...@apache.org>
Authored: Sat Jul 14 13:24:03 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sat Jul 14 14:12:20 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 62 +++++++-------
.../org/apache/s4/comm/tcp/NetworkGlitchTest.java | 38 ---------
.../src/main/java/s4app/ConsumerPE.java | 2 +-
3 files changed, 32 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cbded5a7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index c60e483..4472910 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -40,32 +40,8 @@ import com.google.inject.Inject;
import com.google.inject.name.Named;
/**
- * <p>
- * TCPEmitter - Uses TCP to send messages across partitions. It
- * <ul>
- * <li>guarantees message delivery</li>
- * <li>preserves pair-wise message ordering; might end up sending duplicates to
- * ensure the order</li>
- * <li>tolerates topology changes, partition re-mapping and network glitches</li>
- * </ul>
- * </p>
+ * TCPEmitter - Uses TCP to send messages across partitions.
*
- * <p>
- * TCPEmitter is designed as follows:
- * <ul>
- * <li>maintains per-partition queue of {@code Message}s</li>
- * <li> <code>send(p, m)</code> queues the message 'm' to partition 'p'</li>
- * <li>a thread-pool is used to send the messages asynchronously to the
- * appropriate partitions; send operations between a pair of partitions are
- * serialized</li>
- * <li>Each {@code Message} implements the {@link ChannelFutureListener} and
- * listens on the {@link ChannelFuture} corresponding to the send operation</li>
- * <li>On success, the message marks itself as sent; messages marked sent at the
- * head of the queue are removed</li>
- * <li>On failure of a message m, 'm' and all the messages queued after 'm' are
- * resent to preserve message ordering</li>
- * </ul>
- * </p>
*/
public class TCPEmitter implements Emitter, ClusterChangeListener {
@@ -130,8 +106,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
@Inject
private void init() {
- this.topology.addListener(this);
refreshCluster();
+ this.topology.addListener(this);
}
private boolean connectTo(Integer partitionId) {
@@ -175,7 +151,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
if (c == null)
return;
- c.write(buffer);
+ c.write(buffer).addListener(new MessageSendingListener(partitionId));
}
@Override
@@ -186,16 +162,16 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
protected void removeChannel(int partition) {
Channel c = partitionChannelMap.remove(partition);
- if (c == null)
+ if (c == null) {
return;
-
+ }
c.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess())
channels.remove(future.getChannel());
else
- logger.error("FAILED to close channel");
+ logger.error("Failed to close channel");
}
});
}
@@ -221,7 +197,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
Integer partition = clusterNode.getPartition();
if (partition == null) {
- logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
+ logger.error("Illegal partition for clusterNode - " + clusterNode);
return;
}
@@ -256,4 +232,28 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
}
}
}
+
+ class MessageSendingListener implements ChannelFutureListener {
+
+ int partitionId = -1;
+
+ public MessageSendingListener(int partitionId) {
+ super();
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ try {
+ // TODO handle possible cluster reconfiguration between send and failure callback
+ logger.warn("Failed to send message to node {} (according to current cluster information)",
+ topology.getPhysicalCluster().getNodes().get(partitionId));
+ } catch (IndexOutOfBoundsException ignored) {
+ // cluster was changed
+ }
+ }
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cbded5a7/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
deleted file mode 100644
index 50ae056..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.s4.comm.tcp;
-
-import java.io.IOException;
-
-import org.apache.s4.comm.util.PartitionInfo;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NetworkGlitchTest extends TCPCommTest {
-
- private static Logger logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
-
- public NetworkGlitchTest() throws IOException {
- super(2);
- logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
- }
-
- public void testDelivery() throws InterruptedException {
- PartitionInfo util = partitions[0];
-
- startThreads();
-
- for (int i = 0; i < 4; i++) {
- Thread.sleep(500);
- logger.debug("Messages sent so far - {}", util.sendThread.sendCounts);
- ((TCPEmitter) util.emitter).removeChannel(0);
- logger.debug("Channel closed");
- }
-
- waitForThreads();
-
- Assert.assertTrue("Message delivery", messageDelivery());
-
- logger.info("Message ordering - " + messageOrdering());
- Assert.assertTrue("Pairwise message ordering", messageOrdering());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cbded5a7/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
----------------------------------------------------------------------
diff --git a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
index bba8559..7c6f123 100644
--- a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
+++ b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
@@ -19,7 +19,7 @@ public class ConsumerPE extends ProcessingElement {
public void onEvent(Event event) {
eventCount++;
- logger.info(
+ logger.trace(
"Received event with tick {} and time {} for event # {}",
new String[] { String.valueOf(event.get("tick", Long.class)), String.valueOf(event.getTime()),
String.valueOf(eventCount) });