You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by kf...@apache.org on 2017/10/18 08:25:03 UTC
svn commit: r1812471 - in /tomcat/trunk:
java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
webapps/docs/changelog.xml
Author: kfujino
Date: Wed Oct 18 08:25:03 2017
New Revision: 1812471
URL: http://svn.apache.org/viewvc?rev=1812471&view=rev
Log:
Ensure that remaining SelectionKeys that were not handled by throwing a ChannelException during SelectionKey processing are handled.
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
tomcat/trunk/webapps/docs/changelog.xml
Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=1812471&r1=1812470&r2=1812471&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Wed Oct 18 08:25:03 2017
@@ -20,8 +20,10 @@ import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -73,22 +75,25 @@ public class ParallelNioSender extends A
msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
while ( (remaining>0) && (delta<getTimeout()) ) {
try {
- remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg);
+ SendResult result = doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg);
+ remaining -= result.getCompleted();
+ if (result.getFailed() != null) {
+ remaining -= result.getFailed().getFaultyMembers().length;
+ if (cx == null) cx = result.getFailed();
+ else cx.addFaultyMember(result.getFailed().getFaultyMembers());
+ }
} catch (Exception x ) {
if (log.isTraceEnabled()) log.trace("Error sending message", x);
- int faulty = (cx == null)?0:cx.getFaultyMembers().length;
- if ( cx == null ) {
+ if (cx == null) {
if ( x instanceof ChannelException ) cx = (ChannelException)x;
else cx = new ChannelException(sm.getString("parallelNioSender.send.failed"), x);
- } else {
- if (x instanceof ChannelException) {
- cx.addFaultyMember(((ChannelException) x).getFaultyMembers());
- }
}
- //count down the remaining on an error
- if (faulty < cx.getFaultyMembers().length) {
- remaining -= (cx.getFaultyMembers().length - faulty);
+ for (int i=0; i<senders.length; i++ ) {
+ if (!senders[i].isComplete()) {
+ cx.addFaultyMember(senders[i].getDestination(),x);
+ }
}
+ throw cx;
}
delta = System.currentTimeMillis() - start;
}
@@ -118,13 +123,18 @@ public class ParallelNioSender extends A
}
- private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg)
- throws IOException, ChannelException {
- int completed = 0;
- int selectedKeys = selector.select(selectTimeOut);
+ private SendResult doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg)
+ throws ChannelException {
+ SendResult result = new SendResult();
+ int selectedKeys;
+ try {
+ selectedKeys = selector.select(selectTimeOut);
+ } catch (IOException ioe) {
+ throw new ChannelException(sm.getString("parallelNioSender.send.failed"), ioe);
+ }
if (selectedKeys == 0) {
- return 0;
+ return result;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
@@ -136,8 +146,8 @@ public class ParallelNioSender extends A
NioSender sender = (NioSender) sk.attachment();
try {
if (sender.process(sk,waitForAck)) {
- completed++;
sender.setComplete(true);
+ result.complete(sender);
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" +
new UniqueId(msg.getUniqueId()) + " at " +
@@ -170,17 +180,18 @@ public class ParallelNioSender extends A
log.warn(sm.getString("parallelNioSender.sender.disconnected.notRetry", sender.getDestination().getName()));
ChannelException cx = new ChannelException(sm.getString("parallelNioSender.sender.disconnected.sendFailed"), x);
cx.addFaultyMember(sender.getDestination(),x);
- throw cx;
+ result.failed(cx);
+ break;
}
byte[] data = sender.getMessage();
- if ( retry ) {
+ if (retry) {
try {
sender.disconnect();
sender.connect();
sender.setAttempt(attempt);
sender.setMessage(data);
- }catch ( Exception ignore){
+ } catch (Exception ignore){
state.setFailing();
}
} else {
@@ -189,12 +200,31 @@ public class ParallelNioSender extends A
Integer.toString(sender.getAttempt()),
Integer.toString(maxAttempts)), x);
cx.addFaultyMember(sender.getDestination(),x);
- throw cx;
+ result.failed(cx);
}//end if
}
}
- return completed;
+ return result;
+
+ }
+
+ private static class SendResult {
+ private List<NioSender> completeSenders = new ArrayList<>();
+ private ChannelException exception = null;
+ private void complete(NioSender sender) {
+ if (!completeSenders.contains(sender)) completeSenders.add(sender);
+ }
+ private int getCompleted() {
+ return completeSenders.size();
+ }
+ private void failed(ChannelException cx){
+ if (exception == null) exception = cx;
+ exception.addFaultyMember(cx.getFaultyMembers());
+ }
+ private ChannelException getFailed() {
+ return exception;
+ }
}
private void connect(NioSender[] senders) throws ChannelException {
Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1812471&r1=1812470&r2=1812471&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Wed Oct 18 08:25:03 2017
@@ -118,6 +118,11 @@
unintended <code>ChannelException</code> caused by comparing the number
of failed members and the number of remaining Senders. (kfujino)
</fix>
+ <fix>
+ Ensure that remaining SelectionKeys that were not handled by throwing a
+ <code>ChannelException</code> during SelectionKey processing are
+ handled. (kfujino)
+ </fix>
</changelog>
</subsection>
<subsection name="Other">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org