You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/07 19:53:36 UTC
svn commit: r419938 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes:
group/ io/ tipis/ transport/nio/
Author: fhanik
Date: Fri Jul 7 10:53:35 2006
New Revision: 419938
URL: http://svn.apache.org/viewvc?rev=419938&view=rev
Log:
Added trace and fixed remote processing exceptions
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Fri Jul 7 10:53:35 2006
@@ -39,6 +39,8 @@
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.Heartbeat;
import org.apache.catalina.tribes.io.BufferPool;
+import java.io.IOException;
+import org.apache.catalina.tribes.RemoteProcessException;
/**
* The default implementation of a Channel.<br>
@@ -231,7 +233,9 @@
public void messageReceived(ChannelMessage msg) {
if ( msg == null ) return;
try {
-
+ if ( log.isTraceEnabled() ) {
+ log.trace("GroupChannel received msg id:"+new UniqueId(msg.getUniqueId()));
+ }
Serializable fwd = null;
if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
fwd = new ByteMessage(msg.getMessage().getBytes());
@@ -255,8 +259,13 @@
//but none was given, send back an immediate one
sendNoRpcChannelReply((RpcMessage)fwd,source);
}
+ if ( log.isTraceEnabled() ) {
+ log.trace("GroupChannel delivered["+rx+"] id:"+new UniqueId(msg.getUniqueId()));
+ }
+
} catch ( Exception x ) {
- log.error("Unable to deserialize channel message.",x);
+ if ( log.isDebugEnabled() ) log.error("Unable to process channel:IOException.",x);
+ throw new RemoteProcessException("IOException:"+x.getMessage(),x);
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Fri Jul 7 10:53:35 2006
@@ -69,10 +69,12 @@
public synchronized void access() {
this.accessed = true;
+ this.lastAccess = System.currentTimeMillis();
}
public synchronized void finish() {
this.accessed = false;
+ this.lastAccess = System.currentTimeMillis();
}
public boolean isAccessed() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Fri Jul 7 10:53:35 2006
@@ -255,14 +255,21 @@
* be replicated
*/
public void replicate(Object key, boolean complete) {
+ if ( log.isTraceEnabled() )
+ log.trace("Replicate invoked on key:"+key);
MapEntry entry = (MapEntry)super.get(key);
if ( !entry.isSerializable() ) return;
if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
Object value = entry.getValue();
//check to see if we need to replicate this object isDirty()||complete
boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
- if (!repl)return;
-
+
+ if (!repl) {
+ if ( log.isTraceEnabled() )
+ log.trace("Not replicating:"+key+", no change made");
+
+ return;
+ }
//check to see if the message is diffable
boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
MapMessage msg = null;
@@ -439,6 +446,8 @@
log.error("Unable to deserialize MapMessage.", x);
return;
}
+ if ( log.isTraceEnabled() )
+ log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
mapMemberAdded(mapmsg.getBackupNodes()[0]);
}
@@ -1008,6 +1017,32 @@
private byte[] keydata;
private byte[] diffvalue;
private Member[] nodes;
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("MapMessage[context=");
+ buf.append(new String(mapId));
+ buf.append("; type=");
+ buf.append(getTypeDesc());
+ buf.append("; key=");
+ buf.append(key);
+ buf.append("; value=");
+ buf.append(value);
+ return buf.toString();
+ }
+
+ public String getTypeDesc() {
+ switch (msgtype) {
+ case MSG_BACKUP: return "MSG_BACKUP";
+ case MSG_RETRIEVE_BACKUP: return "MSG_RETRIEVE_BACKUP";
+ case MSG_PROXY: return "MSG_PROXY";
+ case MSG_REMOVE: return "MSG_REMOVE";
+ case MSG_STATE: return "MSG_STATE";
+ case MSG_START: return "MSG_START";
+ case MSG_STOP: return "MSG_STOP";
+ case MSG_INIT: return "MSG_INIT";
+ default : return "UNKNOWN";
+ }
+ }
public MapMessage() {}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Jul 7 10:53:35 2006
@@ -23,6 +23,8 @@
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.UniqueId;
/**
* A smart implementation of a stateful replicated map. uses primary/secondary backup strategy.
@@ -145,7 +147,11 @@
//publish the backup data to one node
msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
(Serializable) key, (Serializable) value, null, backup);
- getChannel().send(backup, msg, getChannelSendOptions());
+ if ( log.isTraceEnabled() )
+ log.trace("Publishing backup data:"+msg+" to: "+next.getName());
+ UniqueId id = getChannel().send(backup, msg, getChannelSendOptions());
+ if ( log.isTraceEnabled() )
+ log.trace("Data published:"+msg+" msg Id:"+id);
//we published out to a backup, mark the test success
success = true;
}catch ( ChannelException x ) {
@@ -157,6 +163,8 @@
if (success && proxies.length > 0 ) {
msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false,
(Serializable) key, null, null, backup);
+ if ( log.isTraceEnabled() )
+ log.trace("Publishing proxy data:"+msg+" to: "+Arrays.toNameString(proxies));
getChannel().send(proxies, msg, getChannelSendOptions());
}
}catch ( ChannelException x ) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Fri Jul 7 10:53:35 2006
@@ -140,6 +140,7 @@
synchronized (events) {
events.add(event);
}
+ if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
selector.wakeup();
}
}
@@ -150,6 +151,7 @@
Runnable r = null;
while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
try {
+ if ( log.isTraceEnabled() ) log.trace("Processing event in selector:"+r);
r.run();
} catch ( Exception x ) {
log.error("",x);
@@ -194,7 +196,7 @@
if ( ka != null ) {
long delta = now - ka.getLastAccess();
if (delta > (long) getTimeout() && (!ka.isAccessed())) {
- log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+")");
+ log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess()));
// System.out.println("Interest:"+key.interestOps());
// System.out.println("Ready Ops:"+key.readyOps());
// System.out.println("Valid:"+key.isValid());
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Fri Jul 7 10:53:35 2006
@@ -29,6 +29,7 @@
import org.apache.catalina.tribes.io.BufferPool;
import java.nio.channels.CancelledKeyException;
import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.RemoteProcessException;
/**
* A worker thread class which can drain channels and echo-back the input. Each
@@ -77,11 +78,19 @@
if (key == null) {
continue; // just in case
}
+ if ( log.isTraceEnabled() )
+ log.trace("Servicing key:"+key);
+
try {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader == null ) {
+ if ( log.isTraceEnabled() )
+ log.trace("No object reader, cancelling:"+key);
cancelKey(key);
} else {
+ if ( log.isTraceEnabled() )
+ log.trace("Draining channel:"+key);
+
drainChannel(key, reader);
}
} catch (Exception e) {
@@ -119,6 +128,10 @@
* worker thread is servicing it.
*/
public synchronized void serviceChannel (SelectionKey key) {
+ if ( log.isTraceEnabled() )
+ log.trace("About to service key:"+key);
+ ObjectReader reader = (ObjectReader)key.attachment();
+ if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -185,6 +198,9 @@
* This is considered a synchronized request
*/
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+ }catch ( RemoteProcessException e ) {
+ if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
}catch ( Exception e ) {
log.error("Processing of cluster message failed.",e);
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
@@ -208,6 +224,8 @@
}
protected void registerForRead(final SelectionKey key, ObjectReader reader) {
+ if ( log.isTraceEnabled() )
+ log.trace("Adding key for read event:"+key);
reader.finish();
//register our OP_READ interest
Runnable r = new Runnable() {
@@ -219,14 +237,16 @@
// resume interest in OP_READ, OP_WRITE
int resumeOps = key.interestOps() | SelectionKey.OP_READ;
key.interestOps(resumeOps);
+ if ( log.isTraceEnabled() )
+ log.trace("Registering key for read:"+key);
}
} catch (CancelledKeyException ckx ) {
NioReceiver.cancelledKey(key);
+ if ( log.isTraceEnabled() )
+ log.trace("CKX Cancelling key:"+key);
+
} catch (Exception x) {
- try {
- key.selector().close();
- } catch (Exception ignore) {}
- log.error("Unable to cycle the selector, connection disconnected?", x);
+ log.error("Error registering key for read:"+key,x);
}
}
};
@@ -234,6 +254,9 @@
}
private void cancelKey(final SelectionKey key) {
+ if ( log.isTraceEnabled() )
+ log.trace("Adding key for cancel event:"+key);
+
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) {
reader.setCancelled(true);
@@ -241,6 +264,9 @@
}
Runnable cx = new Runnable() {
public void run() {
+ if ( log.isTraceEnabled() )
+ log.trace("Cancelling key:"+key);
+
NioReceiver.cancelledKey(key);
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org