You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2007/11/13 14:43:17 UTC
svn commit: r594534 - in /webservices/axis2/trunk/java/modules/clustering:
src/org/apache/axis2/clustering/tribes/
test/org/apache/axis2/clustering/tribes/
Author: azeez
Date: Tue Nov 13 05:43:17 2007
New Revision: 594534
URL: http://svn.apache.org/viewvc?rev=594534&view=rev
Log:
Handled classloading issues when serializable objects within services and modules have to be replicated.
Added:
webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=594534&r1=594533&r2=594534&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Tue Nov 13 05:43:17 2007
@@ -29,14 +29,22 @@
import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
import org.apache.axis2.clustering.control.AckCommand;
import org.apache.axis2.clustering.control.ControlCommand;
-import org.apache.axis2.clustering.control.GetStateResponseCommand;
import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
public class ChannelListener implements org.apache.catalina.tribes.ChannelListener {
private static final Log log = LogFactory.getLog(ChannelListener.class);
@@ -77,6 +85,30 @@
}
public void messageReceived(Serializable msg, Member sender) {
+ try {
+ AxisConfiguration configuration = configurationContext.getAxisConfiguration();
+ List classLoaders = new ArrayList();
+ classLoaders.add(configuration.getSystemClassLoader());
+ classLoaders.add(getClass().getClassLoader());
+ for (Iterator iter = configuration.getServiceGroups(); iter.hasNext();) {
+ AxisServiceGroup group = (AxisServiceGroup) iter.next();
+ classLoaders.add(group.getServiceGroupClassLoader());
+ }
+ for (Iterator iter = configuration.getModules().values().iterator(); iter.hasNext();) {
+ AxisModule module = (AxisModule) iter.next();
+ classLoaders.add(module.getModuleClassLoader());
+ }
+
+
+ byte[] message = ((ByteMessage) msg).getMessage();
+ msg = XByteBuffer.deserialize(message,
+ 0,
+ message.length,
+ (ClassLoader[])classLoaders.toArray(new ClassLoader[classLoaders.size()]));
+ } catch (Exception e) {
+ log.error(e);
+ }
+
// If the system has not still been intialized, reject all incoming messages, except the
// GetStateResponseCommand message
if (configurationContext.
@@ -87,9 +119,9 @@
log.warn("Received message before cluster initialization has been completed");
return;
}
- log.debug("RECEIVED MESSAGE " + msg + " from " + TribesUtil.getHost(sender));
+ log.debug("Received message " + msg + " from " + TribesUtil.getHost(sender));
try {
- processMessage(msg,sender);
+ processMessage(msg, sender);
} catch (Exception e) {
log.error(e);
}
@@ -118,6 +150,6 @@
} else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
controlCommandProcessor.process((ControlCommand) msg,
sender);
- }
+ }
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=594534&r1=594533&r2=594534&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Tue Nov 13 05:43:17 2007
@@ -22,10 +22,16 @@
import org.apache.axis2.clustering.ClusteringCommand;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.MessageSender;
-import org.apache.catalina.tribes.*;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.Member;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.ObjectOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
public class ChannelSender implements MessageSender {
private Log log = LogFactory.getLog(ChannelSender.class);
@@ -39,16 +45,16 @@
// Keep retrying, since at the point of trying to send the msg, a member may leave the group
// causing a view change. All nodes in a view should get the msg
- //TODO: Sometimes Tribes ncorrectly detects that a member has left a group
+ //TODO: Sometimes Tribes incorrectly detects that a member has left a group
while (true) {
if (channel.getMembers().length > 0) {
try {
long start = System.currentTimeMillis();
- channel.send(channel.getMembers(), msg, Channel.SEND_OPTIONS_USE_ACK);
+ channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
timeToSend = System.currentTimeMillis() - start;
log.debug("Sent " + msg + " to group");
break;
- } catch (ChannelException e) {
+ } catch (Exception e) {
String message = "Error sending command message : " + msg +
". Reason " + e.getMessage();
log.warn(message);
@@ -60,16 +66,25 @@
return timeToSend;
}
+ private ByteMessage toByteMessage(ClusteringCommand msg) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos);
+ out.writeObject(msg);
+ out.flush();
+ out.close();
+ return new ByteMessage(bos.toByteArray());
+ }
+
public void sendToSelf(ClusteringCommand msg) throws ClusteringFault {
if (channel == null) {
return;
}
try {
channel.send(new Member[]{channel.getLocalMember(true)},
- msg,
+ toByteMessage(msg),
Channel.SEND_OPTIONS_USE_ACK);
log.debug("Sent " + msg + " to self");
- } catch (ChannelException e) {
+ } catch (Exception e) {
throw new ClusteringFault(e);
}
}
@@ -79,11 +94,11 @@
try {
if (member.isReady()) {
long start = System.currentTimeMillis();
- channel.send(new Member[]{member}, cmd, Channel.SEND_OPTIONS_USE_ACK);
+ channel.send(new Member[]{member}, toByteMessage(cmd), Channel.SEND_OPTIONS_USE_ACK);
timeToSend = System.currentTimeMillis() - start;
log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
}
- } catch (ChannelException e) {
+ } catch (Exception e) {
String message = "Could not send message to " + TribesUtil.getHost(member) +
". Reason " + e.getMessage();
log.warn(message);
Added: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java?rev=594534&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/tribes/ObjectSerializationTest.java Tue Nov 13 05:43:17 2007
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import junit.framework.TestCase;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.axis2.clustering.control.AckCommand;
+
+/**
+ *
+ */
+public class ObjectSerializationTest extends TestCase {
+
+ public void testSerialization(){
+ AckCommand ackCommand = new AckCommand("uuid");
+ ackCommand.setMemberId("123456");
+
+ AckCommand ackCommand2 = (AckCommand) copy(ackCommand);
+ assertFalse(ackCommand.equals(ackCommand2));
+
+ assertEquals(ackCommand.getUniqueId(), ackCommand2.getUniqueId());
+ }
+
+ /**
+ * Returns a copy of the object, or null if the object cannot
+ * be serialized.
+ */
+ public Object copy(Object orig) {
+ Object obj = null;
+ try {
+ // Write the object out to a byte array
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos);
+ out.writeObject(orig);
+ out.flush();
+ out.close();
+
+ // Make an input stream from the byte array and read
+ // a copy of the object back in.
+ ObjectInputStream in = new ObjectInputStream(
+ new ByteArrayInputStream(bos.toByteArray()));
+ obj = in.readObject();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ catch (ClassNotFoundException cnfe) {
+ cnfe.printStackTrace();
+ }
+ return obj;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org