You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2007/11/13 14:43:17 UTC

svn commit: r594534 - in /webservices/axis2/trunk/java/modules/clustering: src/org/apache/axis2/clustering/tribes/ test/org/apache/axis2/clustering/tribes/

Author: azeez
Date: Tue Nov 13 05:43:17 2007
New Revision: 594534

URL: http://svn.apache.org/viewvc?rev=594534&view=rev
Log:
Handled classloading issues when serializable objects within services and modules have to be replicated.


Added:
    webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=594534&r1=594533&r2=594534&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Tue Nov 13 05:43:17 2007
@@ -29,14 +29,22 @@
 import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
 import org.apache.axis2.clustering.control.AckCommand;
 import org.apache.axis2.clustering.control.ControlCommand;
-import org.apache.axis2.clustering.control.GetStateResponseCommand;
 import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.catalina.tribes.ByteMessage;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
 public class ChannelListener implements org.apache.catalina.tribes.ChannelListener {
     private static final Log log = LogFactory.getLog(ChannelListener.class);
@@ -77,6 +85,30 @@
     }
 
     public void messageReceived(Serializable msg, Member sender) {
+        try {
+            AxisConfiguration configuration = configurationContext.getAxisConfiguration();
+            List classLoaders = new ArrayList();
+            classLoaders.add(configuration.getSystemClassLoader());
+            classLoaders.add(getClass().getClassLoader());
+            for (Iterator iter = configuration.getServiceGroups(); iter.hasNext();) {
+                AxisServiceGroup group = (AxisServiceGroup) iter.next();
+                classLoaders.add(group.getServiceGroupClassLoader());
+            }
+            for (Iterator iter = configuration.getModules().values().iterator(); iter.hasNext();) {
+                AxisModule module = (AxisModule) iter.next();
+                classLoaders.add(module.getModuleClassLoader());
+            }
+
+
+            byte[] message = ((ByteMessage) msg).getMessage();
+            msg = XByteBuffer.deserialize(message,
+                                          0,
+                                          message.length,
+                                          (ClassLoader[])classLoaders.toArray(new ClassLoader[classLoaders.size()])); 
+        } catch (Exception e) {
+            log.error(e);
+        }
+
         // If the system has not still been intialized, reject all incoming messages, except the
         // GetStateResponseCommand message
         if (configurationContext.
@@ -87,9 +119,9 @@
             log.warn("Received message before cluster initialization has been completed");
             return;
         }
-        log.debug("RECEIVED MESSAGE " + msg + " from " + TribesUtil.getHost(sender));
+        log.debug("Received message " + msg + " from " + TribesUtil.getHost(sender));
         try {
-            processMessage(msg,sender);
+            processMessage(msg, sender);
         } catch (Exception e) {
             log.error(e);
         }
@@ -118,6 +150,6 @@
         } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
             controlCommandProcessor.process((ControlCommand) msg,
                                             sender);
-        } 
+        }
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=594534&r1=594533&r2=594534&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Tue Nov 13 05:43:17 2007
@@ -22,10 +22,16 @@
 import org.apache.axis2.clustering.ClusteringCommand;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.MessageSender;
-import org.apache.catalina.tribes.*;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.Member;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.io.ObjectOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
 public class ChannelSender implements MessageSender {
 
     private Log log = LogFactory.getLog(ChannelSender.class);
@@ -39,16 +45,16 @@
 
         // Keep retrying, since at the point of trying to send the msg, a member may leave the group
         // causing a view change. All nodes in a view should get the msg
-        //TODO: Sometimes Tribes ncorrectly detects that a member has left a group
+        //TODO: Sometimes Tribes incorrectly detects that a member has left a group
         while (true) {
             if (channel.getMembers().length > 0) {
                 try {
                     long start = System.currentTimeMillis();
-                    channel.send(channel.getMembers(), msg, Channel.SEND_OPTIONS_USE_ACK);
+                    channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
                     timeToSend = System.currentTimeMillis() - start;
                     log.debug("Sent " + msg + " to group");
                     break;
-                } catch (ChannelException e) {
+                } catch (Exception e) {
                     String message = "Error sending command message : " + msg +
                                      ". Reason " + e.getMessage();
                     log.warn(message);
@@ -60,16 +66,25 @@
         return timeToSend;
     }
 
+    private ByteMessage toByteMessage(ClusteringCommand msg) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos);
+        out.writeObject(msg);
+        out.flush();
+        out.close();
+        return new ByteMessage(bos.toByteArray());
+    }
+
     public void sendToSelf(ClusteringCommand msg) throws ClusteringFault {
         if (channel == null) {
             return;
         }
         try {
             channel.send(new Member[]{channel.getLocalMember(true)},
-                         msg,
+                         toByteMessage(msg),
                          Channel.SEND_OPTIONS_USE_ACK);
             log.debug("Sent " + msg + " to self");
-        } catch (ChannelException e) {
+        } catch (Exception e) {
             throw new ClusteringFault(e);
         }
     }
@@ -79,11 +94,11 @@
         try {
             if (member.isReady()) {
                 long start = System.currentTimeMillis();
-                channel.send(new Member[]{member}, cmd, Channel.SEND_OPTIONS_USE_ACK);
+                channel.send(new Member[]{member}, toByteMessage(cmd), Channel.SEND_OPTIONS_USE_ACK);
                 timeToSend = System.currentTimeMillis() - start;
                 log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
             }
-        } catch (ChannelException e) {
+        } catch (Exception e) {
             String message = "Could not send message to " + TribesUtil.getHost(member) +
                              ". Reason " + e.getMessage();
             log.warn(message);

Added: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java?rev=594534&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java Tue Nov 13 05:43:17 2007
@@ -0,0 +1,71 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import junit.framework.TestCase;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.axis2.clustering.control.AckCommand;
+
+/**
+ *
+ */
+public class ObjectSerializationTest extends TestCase {
+
+    public void testSerialization(){
+        AckCommand ackCommand = new AckCommand("uuid");
+        ackCommand.setMemberId("123456");
+
+        AckCommand ackCommand2 = (AckCommand) copy(ackCommand);
+        assertFalse(ackCommand.equals(ackCommand2));
+
+        assertEquals(ackCommand.getUniqueId(), ackCommand2.getUniqueId());
+    }
+
+    /**
+     * Returns a copy of the object, or null if the object cannot
+     * be serialized.
+     */
+    public Object copy(Object orig) {
+        Object obj = null;
+        try {
+            // Write the object out to a byte array
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            ObjectOutputStream out = new ObjectOutputStream(bos);
+            out.writeObject(orig);
+            out.flush();
+            out.close();
+
+            // Make an input stream from the byte array and read
+            // a copy of the object back in.
+            ObjectInputStream in = new ObjectInputStream(
+                    new ByteArrayInputStream(bos.toByteArray()));
+            obj = in.readObject();
+        }
+        catch (IOException e) {
+            e.printStackTrace();
+        }
+        catch (ClassNotFoundException cnfe) {
+            cnfe.printStackTrace();
+        }
+        return obj;
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org