You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 04:45:59 UTC
svn commit: r467206 [5/30] - in /tomcat: build/tc5.5.x/ connectors/trunk/
connectors/trunk/ajp/ajplib/test/ connectors/trunk/ajp/proxy/
connectors/trunk/jk/jkstatus/src/share/org/apache/jk/status/
connectors/trunk/jk/native/iis/ connectors/trunk/jk/nat...
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Mon Oct 23 19:45:46 2006
@@ -1,667 +1,667 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.group;
-
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import org.apache.catalina.tribes.ByteMessage;
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelInterceptor;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.ChannelSender;
-import org.apache.catalina.tribes.ErrorHandler;
-import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.MembershipService;
-import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.UniqueId;
-import org.apache.catalina.tribes.Heartbeat;
-import org.apache.catalina.tribes.io.BufferPool;
-import java.io.IOException;
-import org.apache.catalina.tribes.RemoteProcessException;
-import org.apache.catalina.tribes.util.Logs;
-import org.apache.catalina.tribes.util.Arrays;
-
-/**
- * The default implementation of a Channel.<br>
- * The GroupChannel manages the replication channel. It coordinates
- * message being sent and received with membership announcements.
- * The channel has an chain of interceptors that can modify the message or perform other logic.<br>
- * It manages a complete group, both membership and replication.
- * @author Filip Hanik
- * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
- */
-public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel {
- /**
- * Flag to determine if the channel manages its own heartbeat
- * If set to true, the channel will start a local thread for the heart beat.
- */
- protected boolean heartbeat = true;
- /**
- * If <code>heartbeat == true</code> then how often do we want this
- * heartbeat to run. default is one minute
- */
- protected long heartbeatSleeptime = 5*1000;//every 5 seconds
-
- /**
- * Internal heartbeat thread
- */
- protected HeartbeatThread hbthread = null;
-
- /**
- * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br>
- * - MembershipService<br>
- * - ChannelSender <br>
- * - ChannelReceiver<br>
- */
- protected ChannelCoordinator coordinator = new ChannelCoordinator();
-
- /**
- * The first interceptor in the inteceptor stack.
- * The interceptors are chained in a linked list, so we only need a reference to the
- * first one
- */
- protected ChannelInterceptor interceptors = null;
-
- /**
- * A list of membership listeners that subscribe to membership announcements
- */
- protected ArrayList membershipListeners = new ArrayList();
-
- /**
- * A list of channel listeners that subscribe to incoming messages
- */
- protected ArrayList channelListeners = new ArrayList();
-
- /**
- * If set to true, the GroupChannel will check to make sure that
- */
- protected boolean optionCheck = false;
-
- /**
- * Creates a GroupChannel. This constructor will also
- * add the first interceptor in the GroupChannel.<br>
- * The first interceptor is always the channel itself.
- */
- public GroupChannel() {
- addInterceptor(this);
- }
-
-
- /**
- * Adds an interceptor to the stack for message processing<br>
- * Interceptors are ordered in the way they are added.<br>
- * <code>channel.addInterceptor(A);</code><br>
- * <code>channel.addInterceptor(C);</code><br>
- * <code>channel.addInterceptor(B);</code><br>
- * Will result in a interceptor stack like this:<br>
- * <code>A -> C -> B</code><br>
- * The complete stack will look like this:<br>
- * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br>
- * @param interceptor ChannelInterceptorBase
- */
- public void addInterceptor(ChannelInterceptor interceptor) {
- if ( interceptors == null ) {
- interceptors = interceptor;
- interceptors.setNext(coordinator);
- interceptors.setPrevious(null);
- coordinator.setPrevious(interceptors);
- } else {
- ChannelInterceptor last = interceptors;
- while ( last.getNext() != coordinator ) {
- last = last.getNext();
- }
- last.setNext(interceptor);
- interceptor.setNext(coordinator);
- interceptor.setPrevious(last);
- coordinator.setPrevious(interceptor);
- }
- }
-
- /**
- * Sends a heartbeat through the interceptor stack.<br>
- * Invoke this method from the application on a periodic basis if
- * you have turned off internal heartbeats <code>channel.setHeartbeat(false)</code>
- */
- public void heartbeat() {
- super.heartbeat();
- Iterator i = membershipListeners.iterator();
- while ( i.hasNext() ) {
- Object o = i.next();
- if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
- }
- i = channelListeners.iterator();
- while ( i.hasNext() ) {
- Object o = i.next();
- if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
- }
-
- }
-
-
- /**
- * Send a message to the destinations specified
- * @param destination Member[] - destination.length > 1
- * @param msg Serializable - the message to send
- * @param options int - sender options, options can trigger guarantee levels and different interceptors to
- * react to the message see class documentation for the <code>Channel</code> object.<br>
- * @return UniqueId - the unique Id that was assigned to this message
- * @throws ChannelException - if an error occurs processing the message
- * @see org.apache.catalina.tribes.Channel
- */
- public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException {
- return send(destination,msg,options,null);
- }
-
- /**
- *
- * @param destination Member[] - destination.length > 1
- * @param msg Serializable - the message to send
- * @param options int - sender options, options can trigger guarantee levels and different interceptors to
- * react to the message see class documentation for the <code>Channel</code> object.<br>
- * @param handler - callback object for error handling and completion notification, used when a message is
- * sent asynchronously using the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled.
- * @return UniqueId - the unique Id that was assigned to this message
- * @throws ChannelException - if an error occurs processing the message
- * @see org.apache.catalina.tribes.Channel
- */
- public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException {
- if ( msg == null ) throw new ChannelException("Cant send a NULL message");
- XByteBuffer buffer = null;
- try {
- if ( destination == null || destination.length == 0) throw new ChannelException("No destination given");
- ChannelData data = new ChannelData(true);//generates a unique Id
- data.setAddress(getLocalMember(false));
- data.setTimestamp(System.currentTimeMillis());
- byte[] b = null;
- if ( msg instanceof ByteMessage ){
- b = ((ByteMessage)msg).getMessage();
- options = options | SEND_OPTIONS_BYTE_MESSAGE;
- } else {
- b = XByteBuffer.serialize(msg);
- options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
- }
- data.setOptions(options);
- //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
- buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false);
- buffer.append(b,0,b.length);
- data.setMessage(buffer);
- InterceptorPayload payload = null;
- if ( handler != null ) {
- payload = new InterceptorPayload();
- payload.setErrorHandler(handler);
- }
- getFirstInterceptor().sendMessage(destination, data, payload);
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
- Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " +msg);
- }
-
- return new UniqueId(data.getUniqueId());
- }catch ( Exception x ) {
- if ( x instanceof ChannelException ) throw (ChannelException)x;
- throw new ChannelException(x);
- } finally {
- if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer);
- }
- }
-
-
- /**
- * Callback from the interceptor stack. <br>
- * When a message is received from a remote node, this method will be invoked by
- * the previous interceptor.<br>
- * This method can also be used to send a message to other components within the same application,
- * but its an extreme case, and you're probably better off doing that logic between the applications itself.
- * @param msg ChannelMessage
- */
- public void messageReceived(ChannelMessage msg) {
- if ( msg == null ) return;
- try {
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
- }
-
- Serializable fwd = null;
- if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
- fwd = new ByteMessage(msg.getMessage().getBytes());
- } else {
- fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength());
- }
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
- }
-
- //get the actual member with the correct alive time
- Member source = msg.getAddress();
- boolean rx = false;
- boolean delivered = false;
- for ( int i=0; i<channelListeners.size(); i++ ) {
- ChannelListener channelListener = (ChannelListener)channelListeners.get(i);
- if (channelListener != null && channelListener.accept(fwd, source)) {
- channelListener.messageReceived(fwd, source);
- delivered = true;
- //if the message was accepted by an RPC channel, that channel
- //is responsible for returning the reply, otherwise we send an absence reply
- if ( channelListener instanceof RpcChannel ) rx = true;
- }
- }//for
- if ((!rx) && (fwd instanceof RpcMessage)) {
- //if we have a message that requires a response,
- //but none was given, send back an immediate one
- sendNoRpcChannelReply((RpcMessage)fwd,source);
- }
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
- }
-
- } catch ( Exception x ) {
- if ( log.isDebugEnabled() ) log.error("Unable to process channel:IOException.",x);
- throw new RemoteProcessException("IOException:"+x.getMessage(),x);
- }
- }
-
- /**
- * Sends a <code>NoRpcChannelReply</code> message to a member<br>
- * This method gets invoked by the channel if a RPC message comes in
- * and no channel listener accepts the message. This avoids timeout
- * @param msg RpcMessage
- * @param destination Member - the destination for the reply
- */
- protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) {
- try {
- //avoid circular loop
- if ( msg instanceof RpcMessage.NoRpcChannelReply) return;
- RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid);
- send(new Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
- } catch ( Exception x ) {
- log.error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x);
- }
- }
-
- /**
- * memberAdded gets invoked by the interceptor below the channel
- * and the channel will broadcast it to the membership listeners
- * @param member Member - the new member
- */
- public void memberAdded(Member member) {
- //notify upwards
- for (int i=0; i<membershipListeners.size(); i++ ) {
- MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
- if (membershipListener != null) membershipListener.memberAdded(member);
- }
- }
-
- /**
- * memberDisappeared gets invoked by the interceptor below the channel
- * and the channel will broadcast it to the membership listeners
- * @param member Member - the member that left or crashed
- */
- public void memberDisappeared(Member member) {
- //notify upwards
- for (int i=0; i<membershipListeners.size(); i++ ) {
- MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
- if (membershipListener != null) membershipListener.memberDisappeared(member);
- }
- }
-
- /**
- * Sets up the default implementation interceptor stack
- * if no interceptors have been added
- * @throws ChannelException
- */
- protected synchronized void setupDefaultStack() throws ChannelException {
-
- if ( getFirstInterceptor() != null &&
- ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
- ChannelInterceptor interceptor = null;
- Class clazz = null;
- try {
- clazz = Class.forName("org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor",
- true,GroupChannel.class.getClassLoader());
- clazz.newInstance();
- } catch ( Throwable x ) {
- clazz = MessageDispatchInterceptor.class;
- }//catch
- try {
- interceptor = (ChannelInterceptor) clazz.newInstance();
- } catch (Exception x) {
- throw new ChannelException("Unable to add MessageDispatchInterceptor to interceptor chain.",x);
- }
- this.addInterceptor(interceptor);
- }
- }
-
- /**
- * Validates the option flags that each interceptor is using and reports
- * an error if two interceptor share the same flag.
- * @throws ChannelException
- */
- protected void checkOptionFlags() throws ChannelException {
- StringBuffer conflicts = new StringBuffer();
- ChannelInterceptor first = interceptors;
- while ( first != null ) {
- int flag = first.getOptionFlag();
- if ( flag != 0 ) {
- ChannelInterceptor next = first.getNext();
- while ( next != null ) {
- int nflag = next.getOptionFlag();
- if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) {
- conflicts.append("[");
- conflicts.append(first.getClass().getName());
- conflicts.append(":");
- conflicts.append(flag);
- conflicts.append(" == ");
- conflicts.append(next.getClass().getName());
- conflicts.append(":");
- conflicts.append(nflag);
- conflicts.append("] ");
- }//end if
- next = next.getNext();
- }//while
- }//end if
- first = first.getNext();
- }//while
- if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString());
-
- }
-
- /**
- * Starts the channel
- * @param svc int - what service to start
- * @throws ChannelException
- * @see org.apache.catalina.tribes.Channel#start(int)
- */
- public synchronized void start(int svc) throws ChannelException {
- setupDefaultStack();
- if (optionCheck) checkOptionFlags();
- super.start(svc);
- if ( hbthread == null && heartbeat ) {
- hbthread = new HeartbeatThread(this,heartbeatSleeptime);
- hbthread.start();
- }
- }
-
- /**
- * Stops the channel
- * @param svc int
- * @throws ChannelException
- * @see org.apache.catalina.tribes.Channel#stop(int)
- */
- public synchronized void stop(int svc) throws ChannelException {
- if (hbthread != null) {
- hbthread.stopHeartbeat();
- hbthread = null;
- }
- super.stop(svc);
- }
-
- /**
- * Returns the first interceptor of the stack. Useful for traversal.
- * @return ChannelInterceptor
- */
- public ChannelInterceptor getFirstInterceptor() {
- if (interceptors != null) return interceptors;
- else return coordinator;
- }
-
- /**
- * Returns the channel receiver component
- * @return ChannelReceiver
- */
- public ChannelReceiver getChannelReceiver() {
- return coordinator.getClusterReceiver();
- }
-
- /**
- * Returns the channel sender component
- * @return ChannelSender
- */
- public ChannelSender getChannelSender() {
- return coordinator.getClusterSender();
- }
-
- /**
- * Returns the membership service component
- * @return MembershipService
- */
- public MembershipService getMembershipService() {
- return coordinator.getMembershipService();
- }
-
- /**
- * Sets the channel receiver component
- * @param clusterReceiver ChannelReceiver
- */
- public void setChannelReceiver(ChannelReceiver clusterReceiver) {
- coordinator.setClusterReceiver(clusterReceiver);
- }
-
- /**
- * Sets the channel sender component
- * @param clusterSender ChannelSender
- */
- public void setChannelSender(ChannelSender clusterSender) {
- coordinator.setClusterSender(clusterSender);
- }
-
- /**
- * Sets the membership component
- * @param membershipService MembershipService
- */
- public void setMembershipService(MembershipService membershipService) {
- coordinator.setMembershipService(membershipService);
- }
-
- /**
- * Adds a membership listener to the channel.<br>
- * Membership listeners are uniquely identified using the equals(Object) method
- * @param membershipListener MembershipListener
- */
- public void addMembershipListener(MembershipListener membershipListener) {
- if (!this.membershipListeners.contains(membershipListener) )
- this.membershipListeners.add(membershipListener);
- }
-
- /**
- * Removes a membership listener from the channel.<br>
- * Membership listeners are uniquely identified using the equals(Object) method
- * @param membershipListener MembershipListener
- */
-
- public void removeMembershipListener(MembershipListener membershipListener) {
- membershipListeners.remove(membershipListener);
- }
-
- /**
- * Adds a channel listener to the channel.<br>
- * Channel listeners are uniquely identified using the equals(Object) method
- * @param channelListener ChannelListener
- */
- public void addChannelListener(ChannelListener channelListener) {
- if (!this.channelListeners.contains(channelListener) ) {
- this.channelListeners.add(channelListener);
- } else {
- throw new IllegalArgumentException("Listener already exists:"+channelListener);
- }
- }
-
- /**
- *
- * Removes a channel listener from the channel.<br>
- * Channel listeners are uniquely identified using the equals(Object) method
- * @param channelListener ChannelListener
- */
- public void removeChannelListener(ChannelListener channelListener) {
- channelListeners.remove(channelListener);
- }
-
- /**
- * Returns an iterator of all the interceptors in this stack
- * @return Iterator
- */
- public Iterator getInterceptors() {
- return new InterceptorIterator(this.getNext(),this.coordinator);
- }
-
- /**
- * Enables/disables the option check<br>
- * Setting this to true, will make the GroupChannel perform a conflict check
- * on the interceptors. If two interceptors are using the same option flag
- * and throw an error upon start.
- * @param optionCheck boolean
- */
- public void setOptionCheck(boolean optionCheck) {
- this.optionCheck = optionCheck;
- }
-
- /**
- * Configure local heartbeat sleep time<br>
- * Only used when <code>getHeartbeat()==true</code>
- * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats
- */
- public void setHeartbeatSleeptime(long heartbeatSleeptime) {
- this.heartbeatSleeptime = heartbeatSleeptime;
- }
-
- /**
- * Enables or disables local heartbeat.
- * if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal
- * thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds
- * @param heartbeat boolean
- */
- public void setHeartbeat(boolean heartbeat) {
- this.heartbeat = heartbeat;
- }
-
- /**
- * @see #setOptionCheck(boolean)
- * @return boolean
- */
- public boolean getOptionCheck() {
- return optionCheck;
- }
-
- /**
- * @see #setHeartbeat(boolean)
- * @return boolean
- */
- public boolean getHeartbeat() {
- return heartbeat;
- }
-
- /**
- * Returns the sleep time in milliseconds that the internal heartbeat will
- * sleep in between invokations of <code>Channel.heartbeat()</code>
- * @return long
- */
- public long getHeartbeatSleeptime() {
- return heartbeatSleeptime;
- }
-
- /**
- *
- * <p>Title: Interceptor Iterator</p>
- *
- * <p>Description: An iterator to loop through the interceptors in a channel</p>
- *
- * @version 1.0
- */
- public static class InterceptorIterator implements Iterator {
- private ChannelInterceptor end;
- private ChannelInterceptor start;
- public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) {
- this.end = end;
- this.start = start;
- }
-
- public boolean hasNext() {
- return start!=null && start != end;
- }
-
- public Object next() {
- Object result = null;
- if ( hasNext() ) {
- result = start;
- start = start.getNext();
- }
- return result;
- }
-
- public void remove() {
- //empty operation
- }
- }
-
- /**
- *
- * <p>Title: Internal heartbeat thread</p>
- *
- * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class
- * is created</p>
- *
- * @version 1.0
- */
- public static class HeartbeatThread extends Thread {
- protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(HeartbeatThread.class);
- protected static int counter = 1;
- protected static synchronized int inc() {
- return counter++;
- }
-
- protected boolean doRun = true;
- protected GroupChannel channel;
- protected long sleepTime;
- public HeartbeatThread(GroupChannel channel, long sleepTime) {
- super();
- this.setPriority(MIN_PRIORITY);
- setName("GroupChannel-Heartbeat-"+inc());
- setDaemon(true);
- this.channel = channel;
- this.sleepTime = sleepTime;
- }
- public void stopHeartbeat() {
- doRun = false;
- interrupt();
- }
-
- public void run() {
- while (doRun) {
- try {
- Thread.sleep(sleepTime);
- channel.heartbeat();
- } catch ( InterruptedException x ) {
- interrupted();
- } catch ( Exception x ) {
- log.error("Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",x);
- }//catch
- }//while
- }//run
- }//HeartbeatThread
-
-
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.ChannelSender;
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.Heartbeat;
+import org.apache.catalina.tribes.io.BufferPool;
+import java.io.IOException;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.util.Logs;
+import org.apache.catalina.tribes.util.Arrays;
+
+/**
+ * The default implementation of a Channel.<br>
+ * The GroupChannel manages the replication channel. It coordinates
+ * message being sent and received with membership announcements.
+ * The channel has an chain of interceptors that can modify the message or perform other logic.<br>
+ * It manages a complete group, both membership and replication.
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel {
+ /**
+ * Flag to determine if the channel manages its own heartbeat
+ * If set to true, the channel will start a local thread for the heart beat.
+ */
+ protected boolean heartbeat = true;
+ /**
+ * If <code>heartbeat == true</code> then how often do we want this
+ * heartbeat to run. default is one minute
+ */
+ protected long heartbeatSleeptime = 5*1000;//every 5 seconds
+
+ /**
+ * Internal heartbeat thread
+ */
+ protected HeartbeatThread hbthread = null;
+
+ /**
+ * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br>
+ * - MembershipService<br>
+ * - ChannelSender <br>
+ * - ChannelReceiver<br>
+ */
+ protected ChannelCoordinator coordinator = new ChannelCoordinator();
+
+ /**
+ * The first interceptor in the inteceptor stack.
+ * The interceptors are chained in a linked list, so we only need a reference to the
+ * first one
+ */
+ protected ChannelInterceptor interceptors = null;
+
+ /**
+ * A list of membership listeners that subscribe to membership announcements
+ */
+ protected ArrayList membershipListeners = new ArrayList();
+
+ /**
+ * A list of channel listeners that subscribe to incoming messages
+ */
+ protected ArrayList channelListeners = new ArrayList();
+
+ /**
+ * If set to true, the GroupChannel will check to make sure that
+ */
+ protected boolean optionCheck = false;
+
+ /**
+ * Creates a GroupChannel. This constructor will also
+ * add the first interceptor in the GroupChannel.<br>
+ * The first interceptor is always the channel itself.
+ */
+ public GroupChannel() {
+ addInterceptor(this);
+ }
+
+
+ /**
+ * Adds an interceptor to the stack for message processing<br>
+ * Interceptors are ordered in the way they are added.<br>
+ * <code>channel.addInterceptor(A);</code><br>
+ * <code>channel.addInterceptor(C);</code><br>
+ * <code>channel.addInterceptor(B);</code><br>
+ * Will result in a interceptor stack like this:<br>
+ * <code>A -> C -> B</code><br>
+ * The complete stack will look like this:<br>
+ * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br>
+ * @param interceptor ChannelInterceptorBase
+ */
+ public void addInterceptor(ChannelInterceptor interceptor) {
+ if ( interceptors == null ) {
+ interceptors = interceptor;
+ interceptors.setNext(coordinator);
+ interceptors.setPrevious(null);
+ coordinator.setPrevious(interceptors);
+ } else {
+ ChannelInterceptor last = interceptors;
+ while ( last.getNext() != coordinator ) {
+ last = last.getNext();
+ }
+ last.setNext(interceptor);
+ interceptor.setNext(coordinator);
+ interceptor.setPrevious(last);
+ coordinator.setPrevious(interceptor);
+ }
+ }
+
+ /**
+ * Sends a heartbeat through the interceptor stack.<br>
+ * Invoke this method from the application on a periodic basis if
+ * you have turned off internal heartbeats <code>channel.setHeartbeat(false)</code>
+ */
+ public void heartbeat() {
+ super.heartbeat();
+ Iterator i = membershipListeners.iterator();
+ while ( i.hasNext() ) {
+ Object o = i.next();
+ if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
+ }
+ i = channelListeners.iterator();
+ while ( i.hasNext() ) {
+ Object o = i.next();
+ if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
+ }
+
+ }
+
+
+ /**
+ * Send a message to the destinations specified
+ * @param destination Member[] - destination.length > 1
+ * @param msg Serializable - the message to send
+ * @param options int - sender options, options can trigger guarantee levels and different interceptors to
+ * react to the message see class documentation for the <code>Channel</code> object.<br>
+ * @return UniqueId - the unique Id that was assigned to this message
+ * @throws ChannelException - if an error occurs processing the message
+ * @see org.apache.catalina.tribes.Channel
+ */
+ public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException {
+ return send(destination,msg,options,null);
+ }
+
+ /**
+ *
+ * @param destination Member[] - destination.length > 1
+ * @param msg Serializable - the message to send
+ * @param options int - sender options, options can trigger guarantee levels and different interceptors to
+ * react to the message see class documentation for the <code>Channel</code> object.<br>
+ * @param handler - callback object for error handling and completion notification, used when a message is
+ * sent asynchronously using the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled.
+ * @return UniqueId - the unique Id that was assigned to this message
+ * @throws ChannelException - if an error occurs processing the message
+ * @see org.apache.catalina.tribes.Channel
+ */
+ public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException {
+ if ( msg == null ) throw new ChannelException("Cant send a NULL message");
+ XByteBuffer buffer = null;
+ try {
+ if ( destination == null || destination.length == 0) throw new ChannelException("No destination given");
+ ChannelData data = new ChannelData(true);//generates a unique Id
+ data.setAddress(getLocalMember(false));
+ data.setTimestamp(System.currentTimeMillis());
+ byte[] b = null;
+ if ( msg instanceof ByteMessage ){
+ b = ((ByteMessage)msg).getMessage();
+ options = options | SEND_OPTIONS_BYTE_MESSAGE;
+ } else {
+ b = XByteBuffer.serialize(msg);
+ options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
+ }
+ data.setOptions(options);
+ //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
+ buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false);
+ buffer.append(b,0,b.length);
+ data.setMessage(buffer);
+ InterceptorPayload payload = null;
+ if ( handler != null ) {
+ payload = new InterceptorPayload();
+ payload.setErrorHandler(handler);
+ }
+ getFirstInterceptor().sendMessage(destination, data, payload);
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
+ Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " +msg);
+ }
+
+ return new UniqueId(data.getUniqueId());
+ }catch ( Exception x ) {
+ if ( x instanceof ChannelException ) throw (ChannelException)x;
+ throw new ChannelException(x);
+ } finally {
+ if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer);
+ }
+ }
+
+
+ /**
+ * Callback from the interceptor stack. <br>
+ * When a message is received from a remote node, this method will be invoked by
+ * the previous interceptor.<br>
+ * This method can also be used to send a message to other components within the same application,
+ * but its an extreme case, and you're probably better off doing that logic between the applications itself.
+ * @param msg ChannelMessage
+ */
+ public void messageReceived(ChannelMessage msg) {
+ if ( msg == null ) return;
+ try {
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
+ }
+
+ Serializable fwd = null;
+ if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
+ fwd = new ByteMessage(msg.getMessage().getBytes());
+ } else {
+ fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength());
+ }
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
+ }
+
+ //get the actual member with the correct alive time
+ Member source = msg.getAddress();
+ boolean rx = false;
+ boolean delivered = false;
+ for ( int i=0; i<channelListeners.size(); i++ ) {
+ ChannelListener channelListener = (ChannelListener)channelListeners.get(i);
+ if (channelListener != null && channelListener.accept(fwd, source)) {
+ channelListener.messageReceived(fwd, source);
+ delivered = true;
+ //if the message was accepted by an RPC channel, that channel
+ //is responsible for returning the reply, otherwise we send an absence reply
+ if ( channelListener instanceof RpcChannel ) rx = true;
+ }
+ }//for
+ if ((!rx) && (fwd instanceof RpcMessage)) {
+ //if we have a message that requires a response,
+ //but none was given, send back an immediate one
+ sendNoRpcChannelReply((RpcMessage)fwd,source);
+ }
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
+ }
+
+ } catch ( Exception x ) {
+ if ( log.isDebugEnabled() ) log.error("Unable to process channel:IOException.",x);
+ throw new RemoteProcessException("IOException:"+x.getMessage(),x);
+ }
+ }
+
+ /**
+ * Sends a <code>NoRpcChannelReply</code> message to a member<br>
+ * This method gets invoked by the channel if a RPC message comes in
+ * and no channel listener accepts the message. This avoids timeout
+ * @param msg RpcMessage
+ * @param destination Member - the destination for the reply
+ */
+ protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) {
+ try {
+ //avoid circular loop
+ if ( msg instanceof RpcMessage.NoRpcChannelReply) return;
+ RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid);
+ send(new Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ } catch ( Exception x ) {
+ log.error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x);
+ }
+ }
+
+ /**
+ * memberAdded gets invoked by the interceptor below the channel
+ * and the channel will broadcast it to the membership listeners
+ * @param member Member - the new member
+ */
+ public void memberAdded(Member member) {
+ //notify upwards
+ for (int i=0; i<membershipListeners.size(); i++ ) {
+ MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
+ if (membershipListener != null) membershipListener.memberAdded(member);
+ }
+ }
+
+ /**
+ * memberDisappeared gets invoked by the interceptor below the channel
+ * and the channel will broadcast it to the membership listeners
+ * @param member Member - the member that left or crashed
+ */
+ public void memberDisappeared(Member member) {
+ //notify upwards
+ for (int i=0; i<membershipListeners.size(); i++ ) {
+ MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
+ if (membershipListener != null) membershipListener.memberDisappeared(member);
+ }
+ }
+
+ /**
+ * Sets up the default implementation interceptor stack
+ * if no interceptors have been added
+ * @throws ChannelException
+ */
+ protected synchronized void setupDefaultStack() throws ChannelException {
+
+ if ( getFirstInterceptor() != null &&
+ ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
+ ChannelInterceptor interceptor = null;
+ Class clazz = null;
+ try {
+ clazz = Class.forName("org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor",
+ true,GroupChannel.class.getClassLoader());
+ clazz.newInstance();
+ } catch ( Throwable x ) {
+ clazz = MessageDispatchInterceptor.class;
+ }//catch
+ try {
+ interceptor = (ChannelInterceptor) clazz.newInstance();
+ } catch (Exception x) {
+ throw new ChannelException("Unable to add MessageDispatchInterceptor to interceptor chain.",x);
+ }
+ this.addInterceptor(interceptor);
+ }
+ }
+
+ /**
+ * Validates the option flags that each interceptor is using and reports
+ * an error if two interceptor share the same flag.
+ * @throws ChannelException
+ */
+ protected void checkOptionFlags() throws ChannelException {
+ StringBuffer conflicts = new StringBuffer();
+ ChannelInterceptor first = interceptors;
+ while ( first != null ) {
+ int flag = first.getOptionFlag();
+ if ( flag != 0 ) {
+ ChannelInterceptor next = first.getNext();
+ while ( next != null ) {
+ int nflag = next.getOptionFlag();
+ if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) {
+ conflicts.append("[");
+ conflicts.append(first.getClass().getName());
+ conflicts.append(":");
+ conflicts.append(flag);
+ conflicts.append(" == ");
+ conflicts.append(next.getClass().getName());
+ conflicts.append(":");
+ conflicts.append(nflag);
+ conflicts.append("] ");
+ }//end if
+ next = next.getNext();
+ }//while
+ }//end if
+ first = first.getNext();
+ }//while
+ if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString());
+
+ }
+
+ /**
+ * Starts the channel
+ * @param svc int - what service to start
+ * @throws ChannelException
+ * @see org.apache.catalina.tribes.Channel#start(int)
+ */
+ public synchronized void start(int svc) throws ChannelException {
+ setupDefaultStack();
+ if (optionCheck) checkOptionFlags();
+ super.start(svc);
+ if ( hbthread == null && heartbeat ) {
+ hbthread = new HeartbeatThread(this,heartbeatSleeptime);
+ hbthread.start();
+ }
+ }
+
+ /**
+ * Stops the channel
+ * @param svc int
+ * @throws ChannelException
+ * @see org.apache.catalina.tribes.Channel#stop(int)
+ */
+ public synchronized void stop(int svc) throws ChannelException {
+ if (hbthread != null) {
+ hbthread.stopHeartbeat();
+ hbthread = null;
+ }
+ super.stop(svc);
+ }
+
+ /**
+ * Returns the first interceptor of the stack. Useful for traversal.
+ * @return ChannelInterceptor
+ */
+ public ChannelInterceptor getFirstInterceptor() {
+ if (interceptors != null) return interceptors;
+ else return coordinator;
+ }
+
+ /**
+ * Returns the channel receiver component
+ * @return ChannelReceiver
+ */
+ public ChannelReceiver getChannelReceiver() {
+ return coordinator.getClusterReceiver();
+ }
+
+ /**
+ * Returns the channel sender component
+ * @return ChannelSender
+ */
+ public ChannelSender getChannelSender() {
+ return coordinator.getClusterSender();
+ }
+
+ /**
+ * Returns the membership service component
+ * @return MembershipService
+ */
+ public MembershipService getMembershipService() {
+ return coordinator.getMembershipService();
+ }
+
+ /**
+ * Sets the channel receiver component
+ * @param clusterReceiver ChannelReceiver
+ */
+ public void setChannelReceiver(ChannelReceiver clusterReceiver) {
+ coordinator.setClusterReceiver(clusterReceiver);
+ }
+
+ /**
+ * Sets the channel sender component
+ * @param clusterSender ChannelSender
+ */
+ public void setChannelSender(ChannelSender clusterSender) {
+ coordinator.setClusterSender(clusterSender);
+ }
+
+ /**
+ * Sets the membership component
+ * @param membershipService MembershipService
+ */
+ public void setMembershipService(MembershipService membershipService) {
+ coordinator.setMembershipService(membershipService);
+ }
+
+ /**
+ * Adds a membership listener to the channel.<br>
+ * Membership listeners are uniquely identified using the equals(Object) method
+ * @param membershipListener MembershipListener
+ */
+ public void addMembershipListener(MembershipListener membershipListener) {
+ if (!this.membershipListeners.contains(membershipListener) )
+ this.membershipListeners.add(membershipListener);
+ }
+
+ /**
+ * Removes a membership listener from the channel.<br>
+ * Membership listeners are uniquely identified using the equals(Object) method
+ * @param membershipListener MembershipListener
+ */
+
+ public void removeMembershipListener(MembershipListener membershipListener) {
+ membershipListeners.remove(membershipListener);
+ }
+
+ /**
+ * Adds a channel listener to the channel.<br>
+ * Channel listeners are uniquely identified using the equals(Object) method
+ * @param channelListener ChannelListener
+ */
+ public void addChannelListener(ChannelListener channelListener) {
+ if (!this.channelListeners.contains(channelListener) ) {
+ this.channelListeners.add(channelListener);
+ } else {
+ throw new IllegalArgumentException("Listener already exists:"+channelListener);
+ }
+ }
+
+ /**
+ *
+ * Removes a channel listener from the channel.<br>
+ * Channel listeners are uniquely identified using the equals(Object) method
+ * @param channelListener ChannelListener
+ */
+ public void removeChannelListener(ChannelListener channelListener) {
+ channelListeners.remove(channelListener);
+ }
+
+ /**
+ * Returns an iterator of all the interceptors in this stack
+ * @return Iterator
+ */
+ public Iterator getInterceptors() {
+ return new InterceptorIterator(this.getNext(),this.coordinator);
+ }
+
+ /**
+ * Enables/disables the option check<br>
+ * Setting this to true, will make the GroupChannel perform a conflict check
+ * on the interceptors. If two interceptors are using the same option flag
+ * and throw an error upon start.
+ * @param optionCheck boolean
+ */
+ public void setOptionCheck(boolean optionCheck) {
+ this.optionCheck = optionCheck;
+ }
+
+ /**
+ * Configure local heartbeat sleep time<br>
+ * Only used when <code>getHeartbeat()==true</code>
+ * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats
+ */
+ public void setHeartbeatSleeptime(long heartbeatSleeptime) {
+ this.heartbeatSleeptime = heartbeatSleeptime;
+ }
+
+ /**
+ * Enables or disables local heartbeat.
+ * if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal
+ * thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds
+ * @param heartbeat boolean
+ */
+ public void setHeartbeat(boolean heartbeat) {
+ this.heartbeat = heartbeat;
+ }
+
+ /**
+ * @see #setOptionCheck(boolean)
+ * @return boolean
+ */
+ public boolean getOptionCheck() {
+ return optionCheck;
+ }
+
+ /**
+ * @see #setHeartbeat(boolean)
+ * @return boolean
+ */
+ public boolean getHeartbeat() {
+ return heartbeat;
+ }
+
+ /**
+ * Returns the sleep time in milliseconds that the internal heartbeat will
+ * sleep in between invokations of <code>Channel.heartbeat()</code>
+ * @return long
+ */
+ public long getHeartbeatSleeptime() {
+ return heartbeatSleeptime;
+ }
+
+ /**
+ *
+ * <p>Title: Interceptor Iterator</p>
+ *
+ * <p>Description: An iterator to loop through the interceptors in a channel</p>
+ *
+ * @version 1.0
+ */
+ public static class InterceptorIterator implements Iterator {
+ private ChannelInterceptor end;
+ private ChannelInterceptor start;
+ public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) {
+ this.end = end;
+ this.start = start;
+ }
+
+ public boolean hasNext() {
+ return start!=null && start != end;
+ }
+
+ public Object next() {
+ Object result = null;
+ if ( hasNext() ) {
+ result = start;
+ start = start.getNext();
+ }
+ return result;
+ }
+
+ public void remove() {
+ //empty operation
+ }
+ }
+
+ /**
+ *
+ * <p>Title: Internal heartbeat thread</p>
+ *
+ * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class
+ * is created</p>
+ *
+ * @version 1.0
+ */
+ public static class HeartbeatThread extends Thread {
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(HeartbeatThread.class);
+ protected static int counter = 1;
+ protected static synchronized int inc() {
+ return counter++;
+ }
+
+ protected boolean doRun = true;
+ protected GroupChannel channel;
+ protected long sleepTime;
+ public HeartbeatThread(GroupChannel channel, long sleepTime) {
+ super();
+ this.setPriority(MIN_PRIORITY);
+ setName("GroupChannel-Heartbeat-"+inc());
+ setDaemon(true);
+ this.channel = channel;
+ this.sleepTime = sleepTime;
+ }
+ public void stopHeartbeat() {
+ doRun = false;
+ interrupt();
+ }
+
+ public void run() {
+ while (doRun) {
+ try {
+ Thread.sleep(sleepTime);
+ channel.heartbeat();
+ } catch ( InterruptedException x ) {
+ interrupted();
+ } catch ( Exception x ) {
+ log.error("Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",x);
+ }//catch
+ }//while
+ }//run
+ }//HeartbeatThread
+
+
+
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/InterceptorPayload.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/InterceptorPayload.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/InterceptorPayload.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/InterceptorPayload.java Mon Oct 23 19:45:46 2006
@@ -1,35 +1,35 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.group;
-
-import org.apache.catalina.tribes.ErrorHandler;
-
-/**
- * @author Filip Hanik
- * @version 1.0
- */
-public class InterceptorPayload {
- private ErrorHandler errorHandler;
-
- public ErrorHandler getErrorHandler() {
- return errorHandler;
- }
-
- public void setErrorHandler(ErrorHandler errorHandler) {
- this.errorHandler = errorHandler;
- }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import org.apache.catalina.tribes.ErrorHandler;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class InterceptorPayload {
+ private ErrorHandler errorHandler;
+
+ public ErrorHandler getErrorHandler() {
+ return errorHandler;
+ }
+
+ public void setErrorHandler(ErrorHandler errorHandler) {
+ this.errorHandler = errorHandler;
+ }
}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/InterceptorPayload.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/InterceptorPayload.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/Response.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/Response.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/Response.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/Response.java Mon Oct 23 19:45:46 2006
@@ -1,54 +1,54 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.group;
-
-import java.io.Serializable;
-
-import org.apache.catalina.tribes.Member;
-
-/**
- * A response object holds a message from a responding partner.
- * @author Filip Hanik
- * @version 1.0
- */
-public class Response {
- private Member source;
- private Serializable message;
- public Response() {
- }
-
- public Response(Member source, Serializable message) {
- this.source = source;
- this.message = message;
- }
-
- public void setSource(Member source) {
- this.source = source;
- }
-
- public void setMessage(Serializable message) {
- this.message = message;
- }
-
- public Member getSource() {
- return source;
- }
-
- public Serializable getMessage() {
- return message;
- }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import java.io.Serializable;
+
+import org.apache.catalina.tribes.Member;
+
+/**
+ * A response object holds a message from a responding partner.
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class Response {
+ private Member source;
+ private Serializable message;
+ public Response() {
+ }
+
+ public Response(Member source, Serializable message) {
+ this.source = source;
+ this.message = message;
+ }
+
+ public void setSource(Member source) {
+ this.source = source;
+ }
+
+ public void setMessage(Serializable message) {
+ this.message = message;
+ }
+
+ public Member getSource() {
+ return source;
+ }
+
+ public Serializable getMessage() {
+ return message;
+ }
}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/Response.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/Response.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcCallback.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcCallback.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcCallback.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcCallback.java Mon Oct 23 19:45:46 2006
@@ -1,46 +1,46 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.group;
-
-import java.io.Serializable;
-
-import org.apache.catalina.tribes.Member;
-
-/**
- * The RpcCallback interface is an interface for the Tribes channel to request a
- * response object to a request that came in.
- * @author not attributable
- * @version 1.0
- */
-public interface RpcCallback {
-
- /**
- *
- * @param msg Serializable
- * @return Serializable - null if no reply should be sent
- */
- public Serializable replyRequest(Serializable msg, Member sender);
-
- /**
- * If the reply has already been sent to the requesting thread,
- * the rpc callback can handle any data that comes in after the fact.
- * @param msg Serializable
- * @param sender Member
- */
- public void leftOver(Serializable msg, Member sender);
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import java.io.Serializable;
+
+import org.apache.catalina.tribes.Member;
+
+/**
+ * The RpcCallback interface is an interface for the Tribes channel to request a
+ * response object to a request that came in.
+ * @author not attributable
+ * @version 1.0
+ */
+public interface RpcCallback {
+
+ /**
+ *
+ * @param msg Serializable
+ * @return Serializable - null if no reply should be sent
+ */
+ public Serializable replyRequest(Serializable msg, Member sender);
+
+ /**
+ * If the reply has already been sent to the requesting thread,
+ * the rpc callback can handle any data that comes in after the fact.
+ * @param msg Serializable
+ * @param sender Member
+ */
+ public void leftOver(Serializable msg, Member sender);
+
}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcCallback.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java Mon Oct 23 19:45:46 2006
@@ -1,262 +1,262 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.group;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-
-/**
- * A channel to handle RPC messaging
- * @author Filip Hanik
- */
-public class RpcChannel implements ChannelListener{
- protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(RpcChannel.class);
-
- public static final int FIRST_REPLY = 1;
- public static final int MAJORITY_REPLY = 2;
- public static final int ALL_REPLY = 3;
- public static final int NO_REPLY = 4;
-
- private Channel channel;
- private RpcCallback callback;
- private byte[] rpcId;
-
- private HashMap responseMap = new HashMap();
-
- /**
- * Create an RPC channel. You can have several RPC channels attached to a group
- * all separated out by the uniqueness
- * @param rpcId - the unique Id for this RPC group
- * @param channel Channel
- * @param callback RpcCallback
- */
- public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
- this.channel = channel;
- this.callback = callback;
- this.rpcId = rpcId;
- channel.addChannelListener(this);
- }
-
-
- /**
- * Send a message and wait for the response.
- * @param destination Member[] - the destination for the message, and the members you request a reply from
- * @param message Serializable - the message you are sending out
- * @param options int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY
- * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned
- * @return Response[] - an array of response objects.
- * @throws ChannelException
- */
- public Response[] send(Member[] destination,
- Serializable message,
- int rpcOptions,
- int channelOptions,
- long timeout) throws ChannelException {
-
- if ( destination==null || destination.length == 0 ) return new Response[0];
-
- //avoid dead lock
- channelOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
-
- RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false));
- RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length,timeout);
- try {
- synchronized (collector) {
- if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector);
- RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
- channel.send(destination, rmsg, channelOptions);
- if ( rpcOptions != NO_REPLY ) collector.wait(timeout);
- }
- } catch ( InterruptedException ix ) {
- Thread.currentThread().interrupted();
- //throw new ChannelException(ix);
- }finally {
- responseMap.remove(key);
- }
- return collector.getResponses();
- }
-
- public void messageReceived(Serializable msg, Member sender) {
- RpcMessage rmsg = (RpcMessage)msg;
- RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
- if ( rmsg.reply ) {
- RpcCollector collector = (RpcCollector)responseMap.get(key);
- if (collector == null) {
- callback.leftOver(rmsg.message, sender);
- } else {
- synchronized (collector) {
- //make sure it hasn't been removed
- if ( responseMap.containsKey(key) ) {
- if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) )
- collector.destcnt--;
- else
- collector.addResponse(rmsg.message, sender);
- if (collector.isComplete()) collector.notifyAll();
- } else {
- if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) )
- callback.leftOver(rmsg.message, sender);
- }
- }//synchronized
- }//end if
- } else{
- Serializable reply = callback.replyRequest(rmsg.message,sender);
- rmsg.reply = true;
- rmsg.message = reply;
- try {
- channel.send(new Member[] {sender}, rmsg,0);
- }catch ( Exception x ) {
- log.error("Unable to send back reply in RpcChannel.",x);
- }
- }//end if
- }
-
- public void breakdown() {
- channel.removeChannelListener(this);
- }
-
- public void finalize() {
- breakdown();
- }
-
- public boolean accept(Serializable msg, Member sender) {
- if ( msg instanceof RpcMessage ) {
- RpcMessage rmsg = (RpcMessage)msg;
- return Arrays.equals(rmsg.rpcId,rpcId);
- }else return false;
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- public RpcCallback getCallback() {
- return callback;
- }
-
- public byte[] getRpcId() {
- return rpcId;
- }
-
- public void setChannel(Channel channel) {
- this.channel = channel;
- }
-
- public void setCallback(RpcCallback callback) {
- this.callback = callback;
- }
-
- public void setRpcId(byte[] rpcId) {
- this.rpcId = rpcId;
- }
-
-
-
- /**
- *
- * Class that holds all response.
- * @author not attributable
- * @version 1.0
- */
- public static class RpcCollector {
- public ArrayList responses = new ArrayList();
- public RpcCollectorKey key;
- public int options;
- public int destcnt;
- public long timeout;
-
- public RpcCollector(RpcCollectorKey key, int options, int destcnt, long timeout) {
- this.key = key;
- this.options = options;
- this.destcnt = destcnt;
- this.timeout = timeout;
- }
-
- public void addResponse(Serializable message, Member sender){
- Response resp = new Response(sender,message);
- responses.add(resp);
- }
-
- public boolean isComplete() {
- if ( destcnt <= 0 ) return true;
- switch (options) {
- case ALL_REPLY:
- return destcnt == responses.size();
- case MAJORITY_REPLY:
- {
- float perc = ((float)responses.size()) / ((float)destcnt);
- return perc >= 0.50f;
- }
- case FIRST_REPLY:
- return responses.size()>0;
- default:
- return false;
- }
- }
-
- public int hashCode() {
- return key.hashCode();
- }
-
- public boolean equals(Object o) {
- if ( o instanceof RpcCollector ) {
- RpcCollector r = (RpcCollector)o;
- return r.key.equals(this.key);
- } else return false;
- }
-
- public Response[] getResponses() {
- return (Response[])responses.toArray(new Response[responses.size()]);
- }
- }
-
- public static class RpcCollectorKey {
- byte[] id;
- public RpcCollectorKey(byte[] id) {
- this.id = id;
- }
-
- public int hashCode() {
- return id[0]+id[1]+id[2]+id[3];
- }
-
- public boolean equals(Object o) {
- if ( o instanceof RpcCollectorKey ) {
- RpcCollectorKey r = (RpcCollectorKey)o;
- return Arrays.equals(id,r.id);
- } else return false;
- }
-
- }
-
- protected static String bToS(byte[] data) {
- StringBuffer buf = new StringBuffer(4*16);
- buf.append("{");
- for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" ");
- buf.append("}");
- return buf.toString();
- }
-
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+
+/**
+ * A channel to handle RPC messaging
+ * @author Filip Hanik
+ */
+public class RpcChannel implements ChannelListener{
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(RpcChannel.class);
+
+ public static final int FIRST_REPLY = 1;
+ public static final int MAJORITY_REPLY = 2;
+ public static final int ALL_REPLY = 3;
+ public static final int NO_REPLY = 4;
+
+ private Channel channel;
+ private RpcCallback callback;
+ private byte[] rpcId;
+
+ private HashMap responseMap = new HashMap();
+
+ /**
+ * Create an RPC channel. You can have several RPC channels attached to a group
+ * all separated out by the uniqueness
+ * @param rpcId - the unique Id for this RPC group
+ * @param channel Channel
+ * @param callback RpcCallback
+ */
+ public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
+ this.channel = channel;
+ this.callback = callback;
+ this.rpcId = rpcId;
+ channel.addChannelListener(this);
+ }
+
+
+ /**
+ * Send a message and wait for the response.
+ * @param destination Member[] - the destination for the message, and the members you request a reply from
+ * @param message Serializable - the message you are sending out
+ * @param options int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY
+ * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned
+ * @return Response[] - an array of response objects.
+ * @throws ChannelException
+ */
+ public Response[] send(Member[] destination,
+ Serializable message,
+ int rpcOptions,
+ int channelOptions,
+ long timeout) throws ChannelException {
+
+ if ( destination==null || destination.length == 0 ) return new Response[0];
+
+ //avoid dead lock
+ channelOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
+
+ RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false));
+ RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length,timeout);
+ try {
+ synchronized (collector) {
+ if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector);
+ RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
+ channel.send(destination, rmsg, channelOptions);
+ if ( rpcOptions != NO_REPLY ) collector.wait(timeout);
+ }
+ } catch ( InterruptedException ix ) {
+ Thread.currentThread().interrupted();
+ //throw new ChannelException(ix);
+ }finally {
+ responseMap.remove(key);
+ }
+ return collector.getResponses();
+ }
+
+ public void messageReceived(Serializable msg, Member sender) {
+ RpcMessage rmsg = (RpcMessage)msg;
+ RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
+ if ( rmsg.reply ) {
+ RpcCollector collector = (RpcCollector)responseMap.get(key);
+ if (collector == null) {
+ callback.leftOver(rmsg.message, sender);
+ } else {
+ synchronized (collector) {
+ //make sure it hasn't been removed
+ if ( responseMap.containsKey(key) ) {
+ if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) )
+ collector.destcnt--;
+ else
+ collector.addResponse(rmsg.message, sender);
+ if (collector.isComplete()) collector.notifyAll();
+ } else {
+ if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) )
+ callback.leftOver(rmsg.message, sender);
+ }
+ }//synchronized
+ }//end if
+ } else{
+ Serializable reply = callback.replyRequest(rmsg.message,sender);
+ rmsg.reply = true;
+ rmsg.message = reply;
+ try {
+ channel.send(new Member[] {sender}, rmsg,0);
+ }catch ( Exception x ) {
+ log.error("Unable to send back reply in RpcChannel.",x);
+ }
+ }//end if
+ }
+
+ public void breakdown() {
+ channel.removeChannelListener(this);
+ }
+
+ public void finalize() {
+ breakdown();
+ }
+
+ public boolean accept(Serializable msg, Member sender) {
+ if ( msg instanceof RpcMessage ) {
+ RpcMessage rmsg = (RpcMessage)msg;
+ return Arrays.equals(rmsg.rpcId,rpcId);
+ }else return false;
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public RpcCallback getCallback() {
+ return callback;
+ }
+
+ public byte[] getRpcId() {
+ return rpcId;
+ }
+
+ public void setChannel(Channel channel) {
+ this.channel = channel;
+ }
+
+ public void setCallback(RpcCallback callback) {
+ this.callback = callback;
+ }
+
+ public void setRpcId(byte[] rpcId) {
+ this.rpcId = rpcId;
+ }
+
+
+
+ /**
+ *
+ * Class that holds all response.
+ * @author not attributable
+ * @version 1.0
+ */
+ public static class RpcCollector {
+ public ArrayList responses = new ArrayList();
+ public RpcCollectorKey key;
+ public int options;
+ public int destcnt;
+ public long timeout;
+
+ public RpcCollector(RpcCollectorKey key, int options, int destcnt, long timeout) {
+ this.key = key;
+ this.options = options;
+ this.destcnt = destcnt;
+ this.timeout = timeout;
+ }
+
+ public void addResponse(Serializable message, Member sender){
+ Response resp = new Response(sender,message);
+ responses.add(resp);
+ }
+
+ public boolean isComplete() {
+ if ( destcnt <= 0 ) return true;
+ switch (options) {
+ case ALL_REPLY:
+ return destcnt == responses.size();
+ case MAJORITY_REPLY:
+ {
+ float perc = ((float)responses.size()) / ((float)destcnt);
+ return perc >= 0.50f;
+ }
+ case FIRST_REPLY:
+ return responses.size()>0;
+ default:
+ return false;
+ }
+ }
+
+ public int hashCode() {
+ return key.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ if ( o instanceof RpcCollector ) {
+ RpcCollector r = (RpcCollector)o;
+ return r.key.equals(this.key);
+ } else return false;
+ }
+
+ public Response[] getResponses() {
+ return (Response[])responses.toArray(new Response[responses.size()]);
+ }
+ }
+
+ public static class RpcCollectorKey {
+ byte[] id;
+ public RpcCollectorKey(byte[] id) {
+ this.id = id;
+ }
+
+ public int hashCode() {
+ return id[0]+id[1]+id[2]+id[3];
+ }
+
+ public boolean equals(Object o) {
+ if ( o instanceof RpcCollectorKey ) {
+ RpcCollectorKey r = (RpcCollectorKey)o;
+ return Arrays.equals(id,r.id);
+ } else return false;
+ }
+
+ }
+
+ protected static String bToS(byte[] data) {
+ StringBuffer buf = new StringBuffer(4*16);
+ buf.append("{");
+ for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" ");
+ buf.append("}");
+ return buf.toString();
+ }
+
+
}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org