You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/07 19:53:36 UTC

svn commit: r419938 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: group/ io/ tipis/ transport/nio/

Author: fhanik
Date: Fri Jul  7 10:53:35 2006
New Revision: 419938

URL: http://svn.apache.org/viewvc?rev=419938&view=rev
Log:
Added trace and fixed remote processing exceptions

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Fri Jul  7 10:53:35 2006
@@ -39,6 +39,8 @@
 import org.apache.catalina.tribes.UniqueId;
 import org.apache.catalina.tribes.Heartbeat;
 import org.apache.catalina.tribes.io.BufferPool;
+import java.io.IOException;
+import org.apache.catalina.tribes.RemoteProcessException;
 
 /**
  * The default implementation of a Channel.<br>
@@ -231,7 +233,9 @@
     public void messageReceived(ChannelMessage msg) {
         if ( msg == null ) return;
         try {
-
+            if ( log.isTraceEnabled() ) {
+                log.trace("GroupChannel received msg id:"+new UniqueId(msg.getUniqueId()));
+            }
             Serializable fwd = null;
             if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
                 fwd = new ByteMessage(msg.getMessage().getBytes());
@@ -255,8 +259,13 @@
                 //but none was given, send back an immediate one
                 sendNoRpcChannelReply((RpcMessage)fwd,source);
             }
+            if ( log.isTraceEnabled() ) {
+                log.trace("GroupChannel delivered["+rx+"] id:"+new UniqueId(msg.getUniqueId()));
+            }
+
         } catch ( Exception x ) {
-            log.error("Unable to deserialize channel message.",x);
+            if ( log.isDebugEnabled() ) log.error("Unable to process channel:IOException.",x);
+            throw new RemoteProcessException("IOException:"+x.getMessage(),x);
         }
     }
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Fri Jul  7 10:53:35 2006
@@ -69,10 +69,12 @@
     
     public synchronized void access() {
         this.accessed = true;
+        this.lastAccess = System.currentTimeMillis();
     }
     
     public synchronized void finish() {
         this.accessed = false;
+        this.lastAccess = System.currentTimeMillis();
     }
     
     public boolean isAccessed() {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Fri Jul  7 10:53:35 2006
@@ -255,14 +255,21 @@
      * be replicated
      */
     public void replicate(Object key, boolean complete) {
+        if ( log.isTraceEnabled() )
+            log.trace("Replicate invoked on key:"+key);
         MapEntry entry = (MapEntry)super.get(key);
         if ( !entry.isSerializable() ) return;
         if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
             Object value = entry.getValue();
             //check to see if we need to replicate this object isDirty()||complete
             boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
-            if (!repl)return;
-
+            
+            if (!repl) {
+                if ( log.isTraceEnabled() )
+                    log.trace("Not replicating:"+key+", no change made");
+                
+                return;
+            }
             //check to see if the message is diffable
             boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
             MapMessage msg = null;
@@ -439,6 +446,8 @@
             log.error("Unable to deserialize MapMessage.", x);
             return;
         }
+        if ( log.isTraceEnabled() ) 
+            log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
         if (mapmsg.getMsgType() == MapMessage.MSG_START) {
             mapMemberAdded(mapmsg.getBackupNodes()[0]);
         }
@@ -1008,6 +1017,32 @@
         private byte[] keydata;
         private byte[] diffvalue;
         private Member[] nodes;
+        
+        public String toString() {
+            StringBuffer buf = new StringBuffer("MapMessage[context=");
+            buf.append(new String(mapId));
+            buf.append("; type=");
+            buf.append(getTypeDesc());
+            buf.append("; key=");
+            buf.append(key);
+            buf.append("; value=");
+            buf.append(value);
+            return buf.toString();
+        }
+        
+        public String getTypeDesc() {
+            switch (msgtype) {
+                case MSG_BACKUP: return "MSG_BACKUP";
+                case MSG_RETRIEVE_BACKUP: return "MSG_RETRIEVE_BACKUP";
+                case MSG_PROXY: return "MSG_PROXY";
+                case MSG_REMOVE: return "MSG_REMOVE";
+                case MSG_STATE: return "MSG_STATE";
+                case MSG_START: return "MSG_START";
+                case MSG_STOP: return "MSG_STOP";
+                case MSG_INIT: return "MSG_INIT";
+                default : return "UNKNOWN";
+            }
+        }
 
         public MapMessage() {}
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Jul  7 10:53:35 2006
@@ -23,6 +23,8 @@
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
 import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.UniqueId;
 
 /**
  * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. 
@@ -145,7 +147,11 @@
                 //publish the backup data to one node
                 msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
                                      (Serializable) key, (Serializable) value, null, backup);
-                getChannel().send(backup, msg, getChannelSendOptions());
+                if ( log.isTraceEnabled() ) 
+                    log.trace("Publishing backup data:"+msg+" to: "+next.getName());
+                UniqueId id = getChannel().send(backup, msg, getChannelSendOptions());
+                if ( log.isTraceEnabled() )
+                    log.trace("Data published:"+msg+" msg Id:"+id);
                 //we published out to a backup, mark the test success
                 success = true;
             }catch ( ChannelException x ) {
@@ -157,6 +163,8 @@
                 if (success && proxies.length > 0 ) {
                     msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false,
                                          (Serializable) key, null, null, backup);
+                    if ( log.isTraceEnabled() ) 
+                    log.trace("Publishing proxy data:"+msg+" to: "+Arrays.toNameString(proxies));
                     getChannel().send(proxies, msg, getChannelSendOptions());
                 }
             }catch  ( ChannelException x ) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Fri Jul  7 10:53:35 2006
@@ -140,6 +140,7 @@
             synchronized (events) {
                 events.add(event);
             }
+            if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
             selector.wakeup();
         }
     }
@@ -150,6 +151,7 @@
             Runnable r = null;
             while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
                 try {
+                    if ( log.isTraceEnabled() ) log.trace("Processing event in selector:"+r);
                     r.run();
                 } catch ( Exception x ) {
                     log.error("",x);
@@ -194,7 +196,7 @@
                     if ( ka != null ) {
                         long delta = now - ka.getLastAccess();
                         if (delta > (long) getTimeout() && (!ka.isAccessed())) {
-                            log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+")");
+                            log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess()));
 //                            System.out.println("Interest:"+key.interestOps());
 //                            System.out.println("Ready Ops:"+key.readyOps());
 //                            System.out.println("Valid:"+key.isValid());

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=419938&r1=419937&r2=419938&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Fri Jul  7 10:53:35 2006
@@ -29,6 +29,7 @@
 import org.apache.catalina.tribes.io.BufferPool;
 import java.nio.channels.CancelledKeyException;
 import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.RemoteProcessException;
 
 /**
  * A worker thread class which can drain channels and echo-back the input. Each
@@ -77,11 +78,19 @@
             if (key == null) {
                 continue;	// just in case
             }
+            if ( log.isTraceEnabled() ) 
+                log.trace("Servicing key:"+key);
+
             try {
                 ObjectReader reader = (ObjectReader)key.attachment();
                 if ( reader == null ) {
+                    if ( log.isTraceEnabled() ) 
+                        log.trace("No object reader, cancelling:"+key);
                     cancelKey(key);
                 } else {
+                    if ( log.isTraceEnabled() ) 
+                        log.trace("Draining channel:"+key);
+
                     drainChannel(key, reader);
                 }
             } catch (Exception e) {
@@ -119,6 +128,10 @@
      * worker thread is servicing it.
      */
     public synchronized void serviceChannel (SelectionKey key) {
+        if ( log.isTraceEnabled() ) 
+            log.trace("About to service key:"+key);
+        ObjectReader reader = (ObjectReader)key.attachment();
+        if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
         this.key = key;
         key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
         key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -185,6 +198,9 @@
                  * This is considered a synchronized request
                  */
                 if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+            }catch ( RemoteProcessException e ) {
+                if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
             }catch ( Exception e ) {
                 log.error("Processing of cluster message failed.",e);
                 if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
@@ -208,6 +224,8 @@
     }
 
     protected void registerForRead(final SelectionKey key, ObjectReader reader) {
+        if ( log.isTraceEnabled() ) 
+            log.trace("Adding key for read event:"+key);
         reader.finish();
         //register our OP_READ interest
         Runnable r = new Runnable() {
@@ -219,14 +237,16 @@
                         // resume interest in OP_READ, OP_WRITE
                         int resumeOps = key.interestOps() | SelectionKey.OP_READ;
                         key.interestOps(resumeOps);
+                        if ( log.isTraceEnabled() ) 
+                            log.trace("Registering key for read:"+key);
                     }
                 } catch (CancelledKeyException ckx ) {
                     NioReceiver.cancelledKey(key);
+                    if ( log.isTraceEnabled() ) 
+                        log.trace("CKX Cancelling key:"+key);
+
                 } catch (Exception x) {
-                    try {
-                        key.selector().close();
-                    } catch (Exception ignore) {}
-                    log.error("Unable to cycle the selector, connection disconnected?", x);
+                    log.error("Error registering key for read:"+key,x);
                 }
             }
         };
@@ -234,6 +254,9 @@
     }
 
     private void cancelKey(final SelectionKey key) {
+        if ( log.isTraceEnabled() ) 
+            log.trace("Adding key for cancel event:"+key);
+
         ObjectReader reader = (ObjectReader)key.attachment();
         if ( reader != null ) {
             reader.setCancelled(true);
@@ -241,6 +264,9 @@
         }
         Runnable cx = new Runnable() {
             public void run() {
+                if ( log.isTraceEnabled() ) 
+                    log.trace("Cancelling key:"+key);
+
                 NioReceiver.cancelledKey(key);
             }
         };



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org