You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 04:45:59 UTC
svn commit: r467206 [8/30] - in /tomcat: build/tc5.5.x/ connectors/trunk/
connectors/trunk/ajp/ajplib/test/ connectors/trunk/ajp/proxy/
connectors/trunk/jk/jkstatus/src/share/org/apache/jk/status/
connectors/trunk/jk/native/iis/ connectors/trunk/jk/nat...
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Mon Oct 23 19:45:46 2006
@@ -1,299 +1,299 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- */
-
-package org.apache.catalina.tribes.group.interceptors;
-
-import java.util.HashMap;
-
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.io.XByteBuffer;
-
-
-
-/**
- *
- * The order interceptor guarantees that messages are received in the same order they were
- * sent.
- * This interceptor works best with the ack=true setting. <br>
- * There is no point in
- * using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR>
- * If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads,
- * this interceptor can really slow you down, as many messages will be completely out of order
- * and the queue might become rather large. If this is the case, then you might want to set
- * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
- * <br><b>Configuration Options</b><br>
- * OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it <b>default=3000ms</b><br>
- * OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering.
- * This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br>
- * OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to
- * do when a message has expired or the queue has grown larger than the maxQueue value.
- * true means that the message is sent up the stack to the receiver that will receive and out of order message
- * false means, forget the message and reset the message counter. <b>default=true</b>
- *
- *
- * @author Filip Hanik
- * @version 1.0
- */
-public class OrderInterceptor extends ChannelInterceptorBase {
- private HashMap outcounter = new HashMap();
- private HashMap incounter = new HashMap();
- private HashMap incoming = new HashMap();
- private long expire = 3000;
- private boolean forwardExpired = true;
- private int maxQueue = Integer.MAX_VALUE;
-
- public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
- for ( int i=0; i<destination.length; i++ ) {
- int nr = incCounter(destination[i]);
- //reduce byte copy
- msg.getMessage().append(nr);
- try {
- getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
- }finally {
- msg.getMessage().trim(4);
- }
- }
- }
-
- public void messageReceived(ChannelMessage msg) {
- int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
- msg.getMessage().trim(4);
- MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
- if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
- }
-
- public synchronized void processLeftOvers(Member member, boolean force) {
- MessageOrder tmp = (MessageOrder)incoming.get(member);
- if ( force ) {
- Counter cnt = getInCounter(member);
- cnt.setCounter(Integer.MAX_VALUE);
- }
- if ( tmp!= null ) processIncoming(tmp);
- }
- /**
- *
- * @param order MessageOrder
- * @return boolean - true if a message expired and was processed
- */
- public synchronized boolean processIncoming(MessageOrder order) {
- boolean result = false;
- Member member = order.getMessage().getAddress();
- Counter cnt = getInCounter(member);
-
- MessageOrder tmp = (MessageOrder)incoming.get(member);
- if ( tmp != null ) {
- order = MessageOrder.add(tmp,order);
- }
-
-
- while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter()) ) {
- //we are right on target. process orders
- if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
- else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());
- super.messageReceived(order.getMessage());
- order.setMessage(null);
- order = order.next;
- }
- MessageOrder head = order;
- MessageOrder prev = null;
- tmp = order;
- //flag to empty out the queue when it larger than maxQueue
- boolean empty = order!=null?order.getCount()>=maxQueue:false;
- while ( tmp != null ) {
- //process expired messages or empty out the queue
- if ( tmp.isExpired(expire) || empty ) {
- //reset the head
- if ( tmp == head ) head = tmp.next;
- cnt.setCounter(tmp.getMsgNr()+1);
- if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());
- tmp.setMessage(null);
- tmp = tmp.next;
- if ( prev != null ) prev.next = tmp;
- result = true;
- } else {
- prev = tmp;
- tmp = tmp.next;
- }
- }
- if ( head == null ) incoming.remove(member);
- else incoming.put(member, head);
- return result;
- }
-
- public void memberAdded(Member member) {
- //notify upwards
- getInCounter(member);
- getOutCounter(member);
- super.memberAdded(member);
- }
-
- public void memberDisappeared(Member member) {
- //notify upwards
- outcounter.remove(member);
- incounter.remove(member);
- //clear the remaining queue
- processLeftOvers(member,true);
- super.memberDisappeared(member);
- }
-
- public int incCounter(Member mbr) {
- Counter cnt = getOutCounter(mbr);
- return cnt.inc();
- }
-
- public synchronized Counter getInCounter(Member mbr) {
- Counter cnt = (Counter)incounter.get(mbr);
- if ( cnt == null ) {
- cnt = new Counter();
- cnt.inc(); //always start at 1 for incoming
- incounter.put(mbr,cnt);
- }
- return cnt;
- }
-
- public synchronized Counter getOutCounter(Member mbr) {
- Counter cnt = (Counter)outcounter.get(mbr);
- if ( cnt == null ) {
- cnt = new Counter();
- outcounter.put(mbr,cnt);
- }
- return cnt;
- }
-
- public static class Counter {
- private int value = 0;
-
- public int getCounter() {
- return value;
- }
-
- public synchronized void setCounter(int counter) {
- this.value = counter;
- }
-
- public synchronized int inc() {
- return ++value;
- }
- }
-
- public static class MessageOrder {
- private long received = System.currentTimeMillis();
- private MessageOrder next;
- private int msgNr;
- private ChannelMessage msg = null;
- public MessageOrder(int msgNr,ChannelMessage msg) {
- this.msgNr = msgNr;
- this.msg = msg;
- }
-
- public boolean isExpired(long expireTime) {
- return (System.currentTimeMillis()-received) > expireTime;
- }
-
- public ChannelMessage getMessage() {
- return msg;
- }
-
- public void setMessage(ChannelMessage msg) {
- this.msg = msg;
- }
-
- public void setNext(MessageOrder order) {
- this.next = order;
- }
- public MessageOrder getNext() {
- return next;
- }
-
- public int getCount() {
- int counter = 1;
- MessageOrder tmp = next;
- while ( tmp != null ) {
- counter++;
- tmp = tmp.next;
- }
- return counter;
- }
-
- public static MessageOrder add(MessageOrder head, MessageOrder add) {
- if ( head == null ) return add;
- if ( add == null ) return head;
- if ( head == add ) return add;
-
- if ( head.getMsgNr() > add.getMsgNr() ) {
- add.next = head;
- return add;
- }
-
- MessageOrder iter = head;
- MessageOrder prev = null;
- while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) {
- prev = iter;
- iter = iter.next;
- }
- if ( iter.getMsgNr() < add.getMsgNr() ) {
- //add after
- add.next = iter.next;
- iter.next = add;
- } else if (iter.getMsgNr() > add.getMsgNr()) {
- //add before
- prev.next = add;
- add.next = iter;
-
- } else {
- throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor");
- }
-
- return head;
- }
-
- public int getMsgNr() {
- return msgNr;
- }
-
-
-
- }
-
- public void setExpire(long expire) {
- this.expire = expire;
- }
-
- public void setForwardExpired(boolean forwardExpired) {
- this.forwardExpired = forwardExpired;
- }
-
- public void setMaxQueue(int maxQueue) {
- this.maxQueue = maxQueue;
- }
-
- public long getExpire() {
- return expire;
- }
-
- public boolean getForwardExpired() {
- return forwardExpired;
- }
-
- public int getMaxQueue() {
- return maxQueue;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.io.XByteBuffer;
+
+
+
+/**
+ *
+ * The order interceptor guarantees that messages are received in the same order they were
+ * sent.
+ * This interceptor works best with the ack=true setting. <br>
+ * There is no point in
+ * using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR>
+ * If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads,
+ * this interceptor can really slow you down, as many messages will be completely out of order
+ * and the queue might become rather large. If this is the case, then you might want to set
+ * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
+ * <br><b>Configuration Options</b><br>
+ * OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it <b>default=3000ms</b><br>
+ * OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering.
+ * This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br>
+ * OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to
+ * do when a message has expired or the queue has grown larger than the maxQueue value.
+ * true means that the message is sent up the stack to the receiver that will receive and out of order message
+ * false means, forget the message and reset the message counter. <b>default=true</b>
+ *
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class OrderInterceptor extends ChannelInterceptorBase {
+ private HashMap outcounter = new HashMap();
+ private HashMap incounter = new HashMap();
+ private HashMap incoming = new HashMap();
+ private long expire = 3000;
+ private boolean forwardExpired = true;
+ private int maxQueue = Integer.MAX_VALUE;
+
+ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+ for ( int i=0; i<destination.length; i++ ) {
+ int nr = incCounter(destination[i]);
+ //reduce byte copy
+ msg.getMessage().append(nr);
+ try {
+ getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
+ }finally {
+ msg.getMessage().trim(4);
+ }
+ }
+ }
+
+ public void messageReceived(ChannelMessage msg) {
+ int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+ msg.getMessage().trim(4);
+ MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
+ if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
+ }
+
+ public synchronized void processLeftOvers(Member member, boolean force) {
+ MessageOrder tmp = (MessageOrder)incoming.get(member);
+ if ( force ) {
+ Counter cnt = getInCounter(member);
+ cnt.setCounter(Integer.MAX_VALUE);
+ }
+ if ( tmp!= null ) processIncoming(tmp);
+ }
+ /**
+ *
+ * @param order MessageOrder
+ * @return boolean - true if a message expired and was processed
+ */
+ public synchronized boolean processIncoming(MessageOrder order) {
+ boolean result = false;
+ Member member = order.getMessage().getAddress();
+ Counter cnt = getInCounter(member);
+
+ MessageOrder tmp = (MessageOrder)incoming.get(member);
+ if ( tmp != null ) {
+ order = MessageOrder.add(tmp,order);
+ }
+
+
+ while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter()) ) {
+ //we are right on target. process orders
+ if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
+ else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());
+ super.messageReceived(order.getMessage());
+ order.setMessage(null);
+ order = order.next;
+ }
+ MessageOrder head = order;
+ MessageOrder prev = null;
+ tmp = order;
+ //flag to empty out the queue when it larger than maxQueue
+ boolean empty = order!=null?order.getCount()>=maxQueue:false;
+ while ( tmp != null ) {
+ //process expired messages or empty out the queue
+ if ( tmp.isExpired(expire) || empty ) {
+ //reset the head
+ if ( tmp == head ) head = tmp.next;
+ cnt.setCounter(tmp.getMsgNr()+1);
+ if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());
+ tmp.setMessage(null);
+ tmp = tmp.next;
+ if ( prev != null ) prev.next = tmp;
+ result = true;
+ } else {
+ prev = tmp;
+ tmp = tmp.next;
+ }
+ }
+ if ( head == null ) incoming.remove(member);
+ else incoming.put(member, head);
+ return result;
+ }
+
+ public void memberAdded(Member member) {
+ //notify upwards
+ getInCounter(member);
+ getOutCounter(member);
+ super.memberAdded(member);
+ }
+
+ public void memberDisappeared(Member member) {
+ //notify upwards
+ outcounter.remove(member);
+ incounter.remove(member);
+ //clear the remaining queue
+ processLeftOvers(member,true);
+ super.memberDisappeared(member);
+ }
+
+ public int incCounter(Member mbr) {
+ Counter cnt = getOutCounter(mbr);
+ return cnt.inc();
+ }
+
+ public synchronized Counter getInCounter(Member mbr) {
+ Counter cnt = (Counter)incounter.get(mbr);
+ if ( cnt == null ) {
+ cnt = new Counter();
+ cnt.inc(); //always start at 1 for incoming
+ incounter.put(mbr,cnt);
+ }
+ return cnt;
+ }
+
+ public synchronized Counter getOutCounter(Member mbr) {
+ Counter cnt = (Counter)outcounter.get(mbr);
+ if ( cnt == null ) {
+ cnt = new Counter();
+ outcounter.put(mbr,cnt);
+ }
+ return cnt;
+ }
+
+ public static class Counter {
+ private int value = 0;
+
+ public int getCounter() {
+ return value;
+ }
+
+ public synchronized void setCounter(int counter) {
+ this.value = counter;
+ }
+
+ public synchronized int inc() {
+ return ++value;
+ }
+ }
+
+ public static class MessageOrder {
+ private long received = System.currentTimeMillis();
+ private MessageOrder next;
+ private int msgNr;
+ private ChannelMessage msg = null;
+ public MessageOrder(int msgNr,ChannelMessage msg) {
+ this.msgNr = msgNr;
+ this.msg = msg;
+ }
+
+ public boolean isExpired(long expireTime) {
+ return (System.currentTimeMillis()-received) > expireTime;
+ }
+
+ public ChannelMessage getMessage() {
+ return msg;
+ }
+
+ public void setMessage(ChannelMessage msg) {
+ this.msg = msg;
+ }
+
+ public void setNext(MessageOrder order) {
+ this.next = order;
+ }
+ public MessageOrder getNext() {
+ return next;
+ }
+
+ public int getCount() {
+ int counter = 1;
+ MessageOrder tmp = next;
+ while ( tmp != null ) {
+ counter++;
+ tmp = tmp.next;
+ }
+ return counter;
+ }
+
+ public static MessageOrder add(MessageOrder head, MessageOrder add) {
+ if ( head == null ) return add;
+ if ( add == null ) return head;
+ if ( head == add ) return add;
+
+ if ( head.getMsgNr() > add.getMsgNr() ) {
+ add.next = head;
+ return add;
+ }
+
+ MessageOrder iter = head;
+ MessageOrder prev = null;
+ while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) {
+ prev = iter;
+ iter = iter.next;
+ }
+ if ( iter.getMsgNr() < add.getMsgNr() ) {
+ //add after
+ add.next = iter.next;
+ iter.next = add;
+ } else if (iter.getMsgNr() > add.getMsgNr()) {
+ //add before
+ prev.next = add;
+ add.next = iter;
+
+ } else {
+ throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor");
+ }
+
+ return head;
+ }
+
+ public int getMsgNr() {
+ return msgNr;
+ }
+
+
+
+ }
+
+ public void setExpire(long expire) {
+ this.expire = expire;
+ }
+
+ public void setForwardExpired(boolean forwardExpired) {
+ this.forwardExpired = forwardExpired;
+ }
+
+ public void setMaxQueue(int maxQueue) {
+ this.maxQueue = maxQueue;
+ }
+
+ public long getExpire() {
+ return expire;
+ }
+
+ public boolean getForwardExpired() {
+ return forwardExpired;
+ }
+
+ public int getMaxQueue() {
+ return maxQueue;
+ }
+
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Mon Oct 23 19:45:46 2006
@@ -1,289 +1,289 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- */
-package org.apache.catalina.tribes.group.interceptors;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.util.Arrays;
-import java.util.HashMap;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelException.FaultyMember;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.RemoteProcessException;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import org.apache.catalina.tribes.membership.Membership;
-import java.net.ConnectException;
-
-/**
- * <p>Title: A perfect failure detector </p>
- *
- * <p>Description: The TcpFailureDetector is a useful interceptor
- * that adds reliability to the membership layer.</p>
- * <p>
- * If the network is busy, or the system is busy so that the membership receiver thread
- * is not getting enough time to update its table, members can be "timed out"
- * This failure detector will intercept the memberDisappeared message(unless its a true shutdown message)
- * and connect to the member using TCP.
- * </p>
- * <p>
- * The TcpFailureDetector works in two ways. <br>
- * 1. It intercepts memberDisappeared events
- * 2. It catches send errors
- * </p>
- *
- * @author Filip Hanik
- * @version 1.0
- */
-public class TcpFailureDetector extends ChannelInterceptorBase {
-
- private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
-
- protected static byte[] TCP_FAIL_DETECT = new byte[] {
- 79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
- 125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
- 55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
- 85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, -64, -43};
-
- protected boolean performConnectTest = true;
-
- protected long connectTimeout = 1000;//1 second default
-
- protected boolean performSendTest = true;
-
- protected boolean performReadTest = false;
-
- protected long readTestTimeout = 5000;//5 seconds
-
- protected Membership membership = null;
-
- protected HashMap removeSuspects = new HashMap();
-
- protected HashMap addSuspects = new HashMap();
-
- public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
- try {
- super.sendMessage(destination, msg, payload);
- }catch ( ChannelException cx ) {
- FaultyMember[] mbrs = cx.getFaultyMembers();
- for ( int i=0; i<mbrs.length; i++ ) {
- if ( mbrs[i].getCause()!=null &&
- (!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok
- this.memberDisappeared(mbrs[i].getMember());
- }//end if
- }//for
- throw cx;
- }
- }
-
- public void messageReceived(ChannelMessage msg) {
- //catch incoming
- boolean process = true;
- if ( okToProcess(msg.getOptions()) ) {
- //check to see if it is a testMessage, if so, process = false
- process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
- (!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
- }//end if
-
- //ignore the message, it doesnt have the flag set
- if ( process ) super.messageReceived(msg);
- else if ( log.isDebugEnabled() ) log.debug("Received a failure detector packet:"+msg);
- }//messageReceived
-
-
- public void memberAdded(Member member) {
- if ( membership == null ) setupMembership();
- boolean notify = false;
- synchronized (membership) {
- if (removeSuspects.containsKey(member)) {
- //previously marked suspect, system below picked up the member again
- removeSuspects.remove(member);
- } else if (membership.getMember( (MemberImpl) member) == null){
- //if we add it here, then add it upwards too
- //check to see if it is alive
- if (memberAlive(member)) {
- membership.memberAlive( (MemberImpl) member);
- notify = true;
- } else {
- addSuspects.put(member, new Long(System.currentTimeMillis()));
- }
- }
- }
- if ( notify ) super.memberAdded(member);
- }
-
- public void memberDisappeared(Member member) {
- if ( membership == null ) setupMembership();
- boolean notify = false;
- boolean shutdown = Arrays.equals(member.getCommand(),Member.SHUTDOWN_PAYLOAD);
- if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify.");
- synchronized (membership) {
- //check to see if the member really is gone
- //if the payload is not a shutdown message
- if (shutdown || !memberAlive(member)) {
- //not correct, we need to maintain the map
- membership.removeMember( (MemberImpl) member);
- removeSuspects.remove(member);
- notify = true;
- } else {
- //add the member as suspect
- removeSuspects.put(member, new Long(System.currentTimeMillis()));
- }
- }
- if ( notify ) {
- log.info("Verification complete. Member disappeared["+member+"]");
- super.memberDisappeared(member);
- } else {
- log.info("Verification complete. Member still alive["+member+"]");
-
- }
- }
-
- public boolean hasMembers() {
- if ( membership == null ) setupMembership();
- return membership.hasMembers();
- }
-
- public Member[] getMembers() {
- if ( membership == null ) setupMembership();
- return membership.getMembers();
- }
-
- public Member getMember(Member mbr) {
- if ( membership == null ) setupMembership();
- return membership.getMember(mbr);
- }
-
- public Member getLocalMember(boolean incAlive) {
- return super.getLocalMember(incAlive);
- }
-
- public void heartbeat() {
- try {
- if (membership == null) setupMembership();
- synchronized (membership) {
- //update all alive times
- Member[] members = super.getMembers();
- for (int i = 0; members != null && i < members.length; i++) {
- if (membership.memberAlive( (MemberImpl) members[i])) {
- //we don't have this one in our membership, check to see if he/she is alive
- if (memberAlive(members[i])) {
- log.warn("Member added, even though we werent notified:" + members[i]);
- super.memberAdded(members[i]);
- } else {
- membership.removeMember( (MemberImpl) members[i]);
- } //end if
- } //end if
- } //for
-
- //check suspect members if they are still alive,
- //if not, simply issue the memberDisappeared message
- MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
- for (int i = 0; i < keys.length; i++) {
- MemberImpl m = (MemberImpl) keys[i];
- if (membership.getMember(m) != null && (!memberAlive(m))) {
- membership.removeMember(m);
- super.memberDisappeared(m);
- removeSuspects.remove(m);
- log.info("Suspect member, confirmed dead.["+m+"]");
- } //end if
- }
-
- //check add suspects members if they are alive now,
- //if they are, simply issue the memberAdded message
- keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
- for (int i = 0; i < keys.length; i++) {
- MemberImpl m = (MemberImpl) keys[i];
- if ( membership.getMember(m) == null && (memberAlive(m))) {
- membership.memberAlive(m);
- super.memberAdded(m);
- addSuspects.remove(m);
- log.info("Suspect member, confirmed alive.["+m+"]");
- } //end if
- }
- }
- }catch ( Exception x ) {
- log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
- } finally {
- super.heartbeat();
- }
- }
-
- protected synchronized void setupMembership() {
- if ( membership == null ) {
- membership = new Membership((MemberImpl)super.getLocalMember(true));
- }
-
- }
-
- protected boolean memberAlive(Member mbr) {
- return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
- }
-
- protected static boolean memberAlive(Member mbr, byte[] msgData,
- boolean sendTest, boolean readTest,
- long readTimeout, long conTimeout,
- int optionFlag) {
- //could be a shutdown notification
- if ( Arrays.equals(mbr.getCommand(),Member.SHUTDOWN_PAYLOAD) ) return false;
-
- Socket socket = new Socket();
- try {
- InetAddress ia = InetAddress.getByAddress(mbr.getHost());
- InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
- socket.setSoTimeout((int)readTimeout);
- socket.connect(addr, (int) conTimeout);
- if ( sendTest ) {
- ChannelData data = new ChannelData(true);
- data.setAddress(mbr);
- data.setMessage(new XByteBuffer(msgData,false));
- data.setTimestamp(System.currentTimeMillis());
- int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
- if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
- else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
- data.setOptions(options);
- byte[] message = XByteBuffer.createDataPackage(data);
- socket.getOutputStream().write(message);
- if ( readTest ) {
- int length = socket.getInputStream().read(message);
- return length > 0;
- }
- }//end if
- return true;
- } catch ( SocketTimeoutException sx) {
- //do nothing, we couldn't connect
- } catch ( ConnectException cx) {
- //do nothing, we couldn't connect
- }catch (Exception x ) {
- log.error("Unable to perform failure detection check, assuming member down.",x);
- } finally {
- try {socket.close(); } catch ( Exception ignore ){}
- }
- return false;
- }
-
-
-
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelException.FaultyMember;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.membership.Membership;
+import java.net.ConnectException;
+
+/**
+ * <p>Title: A perfect failure detector </p>
+ *
+ * <p>Description: The TcpFailureDetector is a useful interceptor
+ * that adds reliability to the membership layer.</p>
+ * <p>
+ * If the network is busy, or the system is busy so that the membership receiver thread
+ * is not getting enough time to update its table, members can be "timed out"
+ * This failure detector will intercept the memberDisappeared message(unless its a true shutdown message)
+ * and connect to the member using TCP.
+ * </p>
+ * <p>
+ * The TcpFailureDetector works in two ways. <br>
+ * 1. It intercepts memberDisappeared events
+ * 2. It catches send errors
+ * </p>
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class TcpFailureDetector extends ChannelInterceptorBase {
+
+ private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
+
+ protected static byte[] TCP_FAIL_DETECT = new byte[] {
+ 79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
+ 125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
+ 55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
+ 85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, -64, -43};
+
+ protected boolean performConnectTest = true;
+
+ protected long connectTimeout = 1000;//1 second default
+
+ protected boolean performSendTest = true;
+
+ protected boolean performReadTest = false;
+
+ protected long readTestTimeout = 5000;//5 seconds
+
+ protected Membership membership = null;
+
+ protected HashMap removeSuspects = new HashMap();
+
+ protected HashMap addSuspects = new HashMap();
+
+ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+ try {
+ super.sendMessage(destination, msg, payload);
+ }catch ( ChannelException cx ) {
+ FaultyMember[] mbrs = cx.getFaultyMembers();
+ for ( int i=0; i<mbrs.length; i++ ) {
+ if ( mbrs[i].getCause()!=null &&
+ (!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok
+ this.memberDisappeared(mbrs[i].getMember());
+ }//end if
+ }//for
+ throw cx;
+ }
+ }
+
+ public void messageReceived(ChannelMessage msg) {
+ //catch incoming
+ boolean process = true;
+ if ( okToProcess(msg.getOptions()) ) {
+ //check to see if it is a testMessage, if so, process = false
+ process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
+ (!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
+ }//end if
+
+ //ignore the message, it doesnt have the flag set
+ if ( process ) super.messageReceived(msg);
+ else if ( log.isDebugEnabled() ) log.debug("Received a failure detector packet:"+msg);
+ }//messageReceived
+
+
+ public void memberAdded(Member member) {
+ if ( membership == null ) setupMembership();
+ boolean notify = false;
+ synchronized (membership) {
+ if (removeSuspects.containsKey(member)) {
+ //previously marked suspect, system below picked up the member again
+ removeSuspects.remove(member);
+ } else if (membership.getMember( (MemberImpl) member) == null){
+ //if we add it here, then add it upwards too
+ //check to see if it is alive
+ if (memberAlive(member)) {
+ membership.memberAlive( (MemberImpl) member);
+ notify = true;
+ } else {
+ addSuspects.put(member, new Long(System.currentTimeMillis()));
+ }
+ }
+ }
+ if ( notify ) super.memberAdded(member);
+ }
+
+ public void memberDisappeared(Member member) {
+ if ( membership == null ) setupMembership();
+ boolean notify = false;
+ boolean shutdown = Arrays.equals(member.getCommand(),Member.SHUTDOWN_PAYLOAD);
+ if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify.");
+ synchronized (membership) {
+ //check to see if the member really is gone
+ //if the payload is not a shutdown message
+ if (shutdown || !memberAlive(member)) {
+ //not correct, we need to maintain the map
+ membership.removeMember( (MemberImpl) member);
+ removeSuspects.remove(member);
+ notify = true;
+ } else {
+ //add the member as suspect
+ removeSuspects.put(member, new Long(System.currentTimeMillis()));
+ }
+ }
+ if ( notify ) {
+ log.info("Verification complete. Member disappeared["+member+"]");
+ super.memberDisappeared(member);
+ } else {
+ log.info("Verification complete. Member still alive["+member+"]");
+
+ }
+ }
+
+ public boolean hasMembers() {
+ if ( membership == null ) setupMembership();
+ return membership.hasMembers();
+ }
+
+ public Member[] getMembers() {
+ if ( membership == null ) setupMembership();
+ return membership.getMembers();
+ }
+
+ public Member getMember(Member mbr) {
+ if ( membership == null ) setupMembership();
+ return membership.getMember(mbr);
+ }
+
+ public Member getLocalMember(boolean incAlive) {
+ return super.getLocalMember(incAlive);
+ }
+
+ public void heartbeat() {
+ try {
+ if (membership == null) setupMembership();
+ synchronized (membership) {
+ //update all alive times
+ Member[] members = super.getMembers();
+ for (int i = 0; members != null && i < members.length; i++) {
+ if (membership.memberAlive( (MemberImpl) members[i])) {
+ //we don't have this one in our membership, check to see if he/she is alive
+ if (memberAlive(members[i])) {
+ log.warn("Member added, even though we werent notified:" + members[i]);
+ super.memberAdded(members[i]);
+ } else {
+ membership.removeMember( (MemberImpl) members[i]);
+ } //end if
+ } //end if
+ } //for
+
+ //check suspect members if they are still alive,
+ //if not, simply issue the memberDisappeared message
+ MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
+ for (int i = 0; i < keys.length; i++) {
+ MemberImpl m = (MemberImpl) keys[i];
+ if (membership.getMember(m) != null && (!memberAlive(m))) {
+ membership.removeMember(m);
+ super.memberDisappeared(m);
+ removeSuspects.remove(m);
+ log.info("Suspect member, confirmed dead.["+m+"]");
+ } //end if
+ }
+
+ //check add suspects members if they are alive now,
+ //if they are, simply issue the memberAdded message
+ keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
+ for (int i = 0; i < keys.length; i++) {
+ MemberImpl m = (MemberImpl) keys[i];
+ if ( membership.getMember(m) == null && (memberAlive(m))) {
+ membership.memberAlive(m);
+ super.memberAdded(m);
+ addSuspects.remove(m);
+ log.info("Suspect member, confirmed alive.["+m+"]");
+ } //end if
+ }
+ }
+ }catch ( Exception x ) {
+ log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
+ } finally {
+ super.heartbeat();
+ }
+ }
+
+ protected synchronized void setupMembership() {
+ if ( membership == null ) {
+ membership = new Membership((MemberImpl)super.getLocalMember(true));
+ }
+
+ }
+
+ protected boolean memberAlive(Member mbr) {
+ return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
+ }
+
+ protected static boolean memberAlive(Member mbr, byte[] msgData,
+ boolean sendTest, boolean readTest,
+ long readTimeout, long conTimeout,
+ int optionFlag) {
+ //could be a shutdown notification
+ if ( Arrays.equals(mbr.getCommand(),Member.SHUTDOWN_PAYLOAD) ) return false;
+
+ Socket socket = new Socket();
+ try {
+ InetAddress ia = InetAddress.getByAddress(mbr.getHost());
+ InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
+ socket.setSoTimeout((int)readTimeout);
+ socket.connect(addr, (int) conTimeout);
+ if ( sendTest ) {
+ ChannelData data = new ChannelData(true);
+ data.setAddress(mbr);
+ data.setMessage(new XByteBuffer(msgData,false));
+ data.setTimestamp(System.currentTimeMillis());
+ int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
+ if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
+ else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
+ data.setOptions(options);
+ byte[] message = XByteBuffer.createDataPackage(data);
+ socket.getOutputStream().write(message);
+ if ( readTest ) {
+ int length = socket.getInputStream().read(message);
+ return length > 0;
+ }
+ }//end if
+ return true;
+ } catch ( SocketTimeoutException sx) {
+ //do nothing, we couldn't connect
+ } catch ( ConnectException cx) {
+ //do nothing, we couldn't connect
+ }catch (Exception x ) {
+ log.error("Unable to perform failure detection check, assuming member down.",x);
+ } finally {
+ try {socket.close(); } catch ( Exception ignore ){}
+ }
+ return false;
+ }
+
+
+
+
}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java Mon Oct 23 19:45:46 2006
@@ -1,120 +1,120 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- */
-
-package org.apache.catalina.tribes.group.interceptors;
-
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import java.text.DecimalFormat;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-
-/**
- *
- *
- * @author Filip Hanik
- * @version 1.0
- */
-public class ThroughputInterceptor extends ChannelInterceptorBase {
- protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThroughputInterceptor.class);
-
- double mbTx = 0;
- double mbAppTx = 0;
- double mbRx = 0;
- double timeTx = 0;
- double lastCnt = 0;
- AtomicLong msgTxCnt = new AtomicLong(1);
- AtomicLong msgRxCnt = new AtomicLong(0);
- AtomicLong msgTxErr = new AtomicLong(0);
- int interval = 10000;
- AtomicInteger access = new AtomicInteger(0);
- long txStart = 0;
- long rxStart = 0;
- DecimalFormat df = new DecimalFormat("#0.00");
-
-
- public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
- if ( access.addAndGet(1) == 1 ) txStart = System.currentTimeMillis();
- long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
- try {
- super.sendMessage(destination, msg, payload);
- }catch ( ChannelException x ) {
- msgTxErr.addAndGet(1);
- access.addAndGet(-1);
- throw x;
- }
- mbTx += ((double)(bytes*destination.length))/(1024d*1024d);
- mbAppTx += ((double)(bytes))/(1024d*1024d);
- if ( access.addAndGet(-1) == 0 ) {
- long stop = System.currentTimeMillis();
- timeTx += ( (double) (stop - txStart)) / 1000d;
- if ((msgTxCnt.get() / interval) >= lastCnt) {
- lastCnt++;
- report(timeTx);
- }
- }
- msgTxCnt.addAndGet(1);
- }
-
- public void messageReceived(ChannelMessage msg) {
- if ( rxStart == 0 ) rxStart = System.currentTimeMillis();
- long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
- mbRx += ((double)bytes)/(1024d*1024d);
- msgRxCnt.addAndGet(1);
- if ( msgRxCnt.get() % interval == 0 ) report(timeTx);
- super.messageReceived(msg);
-
- }
-
- public void report(double timeTx) {
- StringBuffer buf = new StringBuffer("ThroughputInterceptor Report[\n\tTx Msg:");
- buf.append(msgTxCnt).append(" messages\n\tSent:");
- buf.append(df.format(mbTx));
- buf.append(" MB (total)\n\tSent:");
- buf.append(df.format(mbAppTx));
- buf.append(" MB (application)\n\tTime:");
- buf.append(df.format(timeTx));
- buf.append(" seconds\n\tTx Speed:");
- buf.append(df.format(mbTx/timeTx));
- buf.append(" MB/sec (total)\n\tTxSpeed:");
- buf.append(df.format(mbAppTx/timeTx));
- buf.append(" MB/sec (application)\n\tError Msg:");
- buf.append(msgTxErr).append("\n\tRx Msg:");
- buf.append(msgRxCnt);
- buf.append(" messages\n\tRx Speed:");
- buf.append(df.format(mbRx/((double)((System.currentTimeMillis()-rxStart)/1000))));
- buf.append(" MB/sec (since 1st msg)\n\tReceived:");
- buf.append(df.format(mbRx)).append(" MB]\n");
- if ( log.isInfoEnabled() ) log.info(buf);
- }
-
- public void setInterval(int interval) {
- this.interval = interval;
- }
-
- public int getInterval() {
- return interval;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.catalina.tribes.group.interceptors;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import java.text.DecimalFormat;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+
+/**
+ *
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class ThroughputInterceptor extends ChannelInterceptorBase {
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThroughputInterceptor.class);
+
+ double mbTx = 0;
+ double mbAppTx = 0;
+ double mbRx = 0;
+ double timeTx = 0;
+ double lastCnt = 0;
+ AtomicLong msgTxCnt = new AtomicLong(1);
+ AtomicLong msgRxCnt = new AtomicLong(0);
+ AtomicLong msgTxErr = new AtomicLong(0);
+ int interval = 10000;
+ AtomicInteger access = new AtomicInteger(0);
+ long txStart = 0;
+ long rxStart = 0;
+ DecimalFormat df = new DecimalFormat("#0.00");
+
+
+ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+ if ( access.addAndGet(1) == 1 ) txStart = System.currentTimeMillis();
+ long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
+ try {
+ super.sendMessage(destination, msg, payload);
+ }catch ( ChannelException x ) {
+ msgTxErr.addAndGet(1);
+ access.addAndGet(-1);
+ throw x;
+ }
+ mbTx += ((double)(bytes*destination.length))/(1024d*1024d);
+ mbAppTx += ((double)(bytes))/(1024d*1024d);
+ if ( access.addAndGet(-1) == 0 ) {
+ long stop = System.currentTimeMillis();
+ timeTx += ( (double) (stop - txStart)) / 1000d;
+ if ((msgTxCnt.get() / interval) >= lastCnt) {
+ lastCnt++;
+ report(timeTx);
+ }
+ }
+ msgTxCnt.addAndGet(1);
+ }
+
+ public void messageReceived(ChannelMessage msg) {
+ if ( rxStart == 0 ) rxStart = System.currentTimeMillis();
+ long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
+ mbRx += ((double)bytes)/(1024d*1024d);
+ msgRxCnt.addAndGet(1);
+ if ( msgRxCnt.get() % interval == 0 ) report(timeTx);
+ super.messageReceived(msg);
+
+ }
+
+ public void report(double timeTx) {
+ StringBuffer buf = new StringBuffer("ThroughputInterceptor Report[\n\tTx Msg:");
+ buf.append(msgTxCnt).append(" messages\n\tSent:");
+ buf.append(df.format(mbTx));
+ buf.append(" MB (total)\n\tSent:");
+ buf.append(df.format(mbAppTx));
+ buf.append(" MB (application)\n\tTime:");
+ buf.append(df.format(timeTx));
+ buf.append(" seconds\n\tTx Speed:");
+ buf.append(df.format(mbTx/timeTx));
+ buf.append(" MB/sec (total)\n\tTxSpeed:");
+ buf.append(df.format(mbAppTx/timeTx));
+ buf.append(" MB/sec (application)\n\tError Msg:");
+ buf.append(msgTxErr).append("\n\tRx Msg:");
+ buf.append(msgRxCnt);
+ buf.append(" messages\n\tRx Speed:");
+ buf.append(df.format(mbRx/((double)((System.currentTimeMillis()-rxStart)/1000))));
+ buf.append(" MB/sec (since 1st msg)\n\tReceived:");
+ buf.append(df.format(mbRx)).append(" MB]\n");
+ if ( log.isInfoEnabled() ) log.info(buf);
+ }
+
+ public void setInterval(int interval) {
+ this.interval = interval;
+ }
+
+ public int getInterval() {
+ return interval;
+ }
+
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java Mon Oct 23 19:45:46 2006
@@ -1,149 +1,149 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.group.interceptors;
-
-import java.util.HashMap;
-
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-import org.apache.catalina.tribes.util.Arrays;
-import org.apache.catalina.tribes.UniqueId;
-import java.util.Map;
-
-/**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Copyright: Copyright (c) 2005</p>
- *
- * <p>Company: </p>
- *
- * @author not attributable
- * @version 1.0
- */
-public class TwoPhaseCommitInterceptor extends ChannelInterceptorBase {
-
- public static final byte[] START_DATA = new byte[] {113, 1, -58, 2, -34, -60, 75, -78, -101, -12, 32, -29, 32, 111, -40, 4};
- public static final byte[] END_DATA = new byte[] {54, -13, 90, 110, 47, -31, 75, -24, -81, -29, 36, 52, -58, 77, -110, 56};
- private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(TwoPhaseCommitInterceptor.class);
-
- protected HashMap messages = new HashMap();
- protected long expire = 1000 * 60; //one minute expiration
- protected boolean deepclone = true;
-
- public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws
- ChannelException {
- //todo, optimize, if destination.length==1, then we can do
- //msg.setOptions(msg.getOptions() & (~getOptionFlag())
- //and just send one message
- if (okToProcess(msg.getOptions()) ) {
- super.sendMessage(destination, msg, null);
- ChannelMessage confirmation = null;
- if ( deepclone ) confirmation = (ChannelMessage)msg.deepclone();
- else confirmation = (ChannelMessage)msg.clone();
- confirmation.getMessage().reset();
- UUIDGenerator.randomUUID(false,confirmation.getUniqueId(),0);
- confirmation.getMessage().append(START_DATA,0,START_DATA.length);
- confirmation.getMessage().append(msg.getUniqueId(),0,msg.getUniqueId().length);
- confirmation.getMessage().append(END_DATA,0,END_DATA.length);
- super.sendMessage(destination,confirmation,payload);
- } else {
- //turn off two phase commit
- //this wont work if the interceptor has 0 as a flag
- //since there is no flag to turn off
- //msg.setOptions(msg.getOptions() & (~getOptionFlag()));
- super.sendMessage(destination, msg, payload);
- }
- }
-
- public void messageReceived(ChannelMessage msg) {
- if (okToProcess(msg.getOptions())) {
- if ( msg.getMessage().getLength() == (START_DATA.length+msg.getUniqueId().length+END_DATA.length) &&
- Arrays.contains(msg.getMessage().getBytesDirect(),0,START_DATA,0,START_DATA.length) &&
- Arrays.contains(msg.getMessage().getBytesDirect(),START_DATA.length+msg.getUniqueId().length,END_DATA,0,END_DATA.length) ) {
- UniqueId id = new UniqueId(msg.getMessage().getBytesDirect(),START_DATA.length,msg.getUniqueId().length);
- MapEntry original = (MapEntry)messages.get(id);
- if ( original != null ) {
- super.messageReceived(original.msg);
- messages.remove(id);
- } else log.warn("Received a confirmation, but original message is missing. Id:"+Arrays.toString(id.getBytes()));
- } else {
- UniqueId id = new UniqueId(msg.getUniqueId());
- MapEntry entry = new MapEntry((ChannelMessage)msg.deepclone(),id,System.currentTimeMillis());
- messages.put(id,entry);
- }
- } else {
- super.messageReceived(msg);
- }
- }
-
- public boolean getDeepclone() {
- return deepclone;
- }
-
- public long getExpire() {
- return expire;
- }
-
- public void setDeepclone(boolean deepclone) {
- this.deepclone = deepclone;
- }
-
- public void setExpire(long expire) {
- this.expire = expire;
- }
-
- public void heartbeat() {
- try {
- long now = System.currentTimeMillis();
- Map.Entry[] entries = (Map.Entry[])messages.entrySet().toArray(new Map.Entry[messages.size()]);
- for (int i=0; i<entries.length; i++ ) {
- MapEntry entry = (MapEntry)entries[i].getValue();
- if ( entry.expired(now,expire) ) {
- log.info("Message ["+entry.id+"] has expired. Removing.");
- messages.remove(entry.id);
- }//end if
- }
- } catch ( Exception x ) {
- log.warn("Unable to perform heartbeat on the TwoPhaseCommit interceptor.",x);
- } finally {
- super.heartbeat();
- }
- }
-
- public static class MapEntry {
- public ChannelMessage msg;
- public UniqueId id;
- public long timestamp;
-
- public MapEntry(ChannelMessage msg, UniqueId id, long timestamp) {
- this.msg = msg;
- this.id = id;
- this.timestamp = timestamp;
- }
- public boolean expired(long now, long expiration) {
- return (now - timestamp ) > expiration;
- }
-
- }
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.UniqueId;
+import java.util.Map;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TwoPhaseCommitInterceptor extends ChannelInterceptorBase {
+
+ public static final byte[] START_DATA = new byte[] {113, 1, -58, 2, -34, -60, 75, -78, -101, -12, 32, -29, 32, 111, -40, 4};
+ public static final byte[] END_DATA = new byte[] {54, -13, 90, 110, 47, -31, 75, -24, -81, -29, 36, 52, -58, 77, -110, 56};
+ private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(TwoPhaseCommitInterceptor.class);
+
+ protected HashMap messages = new HashMap();
+ protected long expire = 1000 * 60; //one minute expiration
+ protected boolean deepclone = true;
+
+ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws
+ ChannelException {
+ //todo, optimize, if destination.length==1, then we can do
+ //msg.setOptions(msg.getOptions() & (~getOptionFlag())
+ //and just send one message
+ if (okToProcess(msg.getOptions()) ) {
+ super.sendMessage(destination, msg, null);
+ ChannelMessage confirmation = null;
+ if ( deepclone ) confirmation = (ChannelMessage)msg.deepclone();
+ else confirmation = (ChannelMessage)msg.clone();
+ confirmation.getMessage().reset();
+ UUIDGenerator.randomUUID(false,confirmation.getUniqueId(),0);
+ confirmation.getMessage().append(START_DATA,0,START_DATA.length);
+ confirmation.getMessage().append(msg.getUniqueId(),0,msg.getUniqueId().length);
+ confirmation.getMessage().append(END_DATA,0,END_DATA.length);
+ super.sendMessage(destination,confirmation,payload);
+ } else {
+ //turn off two phase commit
+ //this wont work if the interceptor has 0 as a flag
+ //since there is no flag to turn off
+ //msg.setOptions(msg.getOptions() & (~getOptionFlag()));
+ super.sendMessage(destination, msg, payload);
+ }
+ }
+
+ public void messageReceived(ChannelMessage msg) {
+ if (okToProcess(msg.getOptions())) {
+ if ( msg.getMessage().getLength() == (START_DATA.length+msg.getUniqueId().length+END_DATA.length) &&
+ Arrays.contains(msg.getMessage().getBytesDirect(),0,START_DATA,0,START_DATA.length) &&
+ Arrays.contains(msg.getMessage().getBytesDirect(),START_DATA.length+msg.getUniqueId().length,END_DATA,0,END_DATA.length) ) {
+ UniqueId id = new UniqueId(msg.getMessage().getBytesDirect(),START_DATA.length,msg.getUniqueId().length);
+ MapEntry original = (MapEntry)messages.get(id);
+ if ( original != null ) {
+ super.messageReceived(original.msg);
+ messages.remove(id);
+ } else log.warn("Received a confirmation, but original message is missing. Id:"+Arrays.toString(id.getBytes()));
+ } else {
+ UniqueId id = new UniqueId(msg.getUniqueId());
+ MapEntry entry = new MapEntry((ChannelMessage)msg.deepclone(),id,System.currentTimeMillis());
+ messages.put(id,entry);
+ }
+ } else {
+ super.messageReceived(msg);
+ }
+ }
+
+ public boolean getDeepclone() {
+ return deepclone;
+ }
+
+ public long getExpire() {
+ return expire;
+ }
+
+ public void setDeepclone(boolean deepclone) {
+ this.deepclone = deepclone;
+ }
+
+ public void setExpire(long expire) {
+ this.expire = expire;
+ }
+
+ public void heartbeat() {
+ try {
+ long now = System.currentTimeMillis();
+ Map.Entry[] entries = (Map.Entry[])messages.entrySet().toArray(new Map.Entry[messages.size()]);
+ for (int i=0; i<entries.length; i++ ) {
+ MapEntry entry = (MapEntry)entries[i].getValue();
+ if ( entry.expired(now,expire) ) {
+ log.info("Message ["+entry.id+"] has expired. Removing.");
+ messages.remove(entry.id);
+ }//end if
+ }
+ } catch ( Exception x ) {
+ log.warn("Unable to perform heartbeat on the TwoPhaseCommit interceptor.",x);
+ } finally {
+ super.heartbeat();
+ }
+ }
+
+ public static class MapEntry {
+ public ChannelMessage msg;
+ public UniqueId id;
+ public long timestamp;
+
+ public MapEntry(ChannelMessage msg, UniqueId id, long timestamp) {
+ this.msg = msg;
+ this.id = id;
+ this.timestamp = timestamp;
+ }
+ public boolean expired(long now, long expiration) {
+ return (now - timestamp ) > expiration;
+ }
+
+ }
+
}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java Mon Oct 23 19:45:46 2006
@@ -1,94 +1,94 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.io;
-
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.commons.logging.Log;
-
-/**
- *
- * @author Filip Hanik
- *
- * @version 1.0
- */
-public class BufferPool {
- protected static Log log = LogFactory.getLog(BufferPool.class);
-
- public static int DEFAULT_POOL_SIZE = 100*1024*1024; //100MB
-
-
-
- protected static BufferPool instance = null;
- protected BufferPoolAPI pool = null;
-
- private BufferPool(BufferPoolAPI pool) {
- this.pool = pool;
- }
-
- public XByteBuffer getBuffer(int minSize, boolean discard) {
- if ( pool != null ) return pool.getBuffer(minSize, discard);
- else return new XByteBuffer(minSize,discard);
- }
-
- public void returnBuffer(XByteBuffer buffer) {
- if ( pool != null ) pool.returnBuffer(buffer);
- }
-
- public void clear() {
- if ( pool != null ) pool.clear();
- }
-
-
- public static BufferPool getBufferPool() {
- if ( (instance == null) ) {
- synchronized (BufferPool.class) {
- if ( instance == null ) {
- BufferPoolAPI pool = null;
- Class clazz = null;
- try {
- clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool15Impl");
- pool = (BufferPoolAPI)clazz.newInstance();
- } catch ( Throwable x ) {
- try {
- clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool14Impl");
- pool = (BufferPoolAPI)clazz.newInstance();
- } catch ( Throwable e ) {
- log.warn("Unable to initilize BufferPool, not pooling XByteBuffer objects:"+x.getMessage());
- if ( log.isDebugEnabled() ) log.debug("Unable to initilize BufferPool, not pooling XByteBuffer objects:",x);
- }
- }
- pool.setMaxSize(DEFAULT_POOL_SIZE);
- log.info("Created a buffer pool with max size:"+DEFAULT_POOL_SIZE+" bytes of type:"+(clazz!=null?clazz.getName():"null"));
- instance = new BufferPool(pool);
- }//end if
- }//sync
- }//end if
- return instance;
- }
-
-
- public static interface BufferPoolAPI {
- public void setMaxSize(int bytes);
-
- public XByteBuffer getBuffer(int minSize, boolean discard);
-
- public void returnBuffer(XByteBuffer buffer);
-
- public void clear();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.logging.Log;
+
+/**
+ *
+ * @author Filip Hanik
+ *
+ * @version 1.0
+ */
+public class BufferPool {
+ protected static Log log = LogFactory.getLog(BufferPool.class);
+
+ public static int DEFAULT_POOL_SIZE = 100*1024*1024; //100MB
+
+
+
+ protected static BufferPool instance = null;
+ protected BufferPoolAPI pool = null;
+
+ private BufferPool(BufferPoolAPI pool) {
+ this.pool = pool;
+ }
+
+ public XByteBuffer getBuffer(int minSize, boolean discard) {
+ if ( pool != null ) return pool.getBuffer(minSize, discard);
+ else return new XByteBuffer(minSize,discard);
+ }
+
+ public void returnBuffer(XByteBuffer buffer) {
+ if ( pool != null ) pool.returnBuffer(buffer);
+ }
+
+ public void clear() {
+ if ( pool != null ) pool.clear();
+ }
+
+
+ public static BufferPool getBufferPool() {
+ if ( (instance == null) ) {
+ synchronized (BufferPool.class) {
+ if ( instance == null ) {
+ BufferPoolAPI pool = null;
+ Class clazz = null;
+ try {
+ clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool15Impl");
+ pool = (BufferPoolAPI)clazz.newInstance();
+ } catch ( Throwable x ) {
+ try {
+ clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool14Impl");
+ pool = (BufferPoolAPI)clazz.newInstance();
+ } catch ( Throwable e ) {
+ log.warn("Unable to initilize BufferPool, not pooling XByteBuffer objects:"+x.getMessage());
+ if ( log.isDebugEnabled() ) log.debug("Unable to initilize BufferPool, not pooling XByteBuffer objects:",x);
+ }
+ }
+ pool.setMaxSize(DEFAULT_POOL_SIZE);
+ log.info("Created a buffer pool with max size:"+DEFAULT_POOL_SIZE+" bytes of type:"+(clazz!=null?clazz.getName():"null"));
+ instance = new BufferPool(pool);
+ }//end if
+ }//sync
+ }//end if
+ return instance;
+ }
+
+
+ public static interface BufferPoolAPI {
+ public void setMaxSize(int bytes);
+
+ public XByteBuffer getBuffer(int minSize, boolean discard);
+
+ public void returnBuffer(XByteBuffer buffer);
+
+ public void clear();
+ }
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java Mon Oct 23 19:45:46 2006
@@ -1,70 +1,70 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.io;
-
-import java.util.Queue;
-import java.util.LinkedList;
-
-
-/**
- *
- * @author Filip Hanik
- * @version 1.0
- */
-class BufferPool14Impl implements BufferPool.BufferPoolAPI {
- protected int maxSize;
- protected int size = 0;
- protected LinkedList queue = new LinkedList();
-
- public void setMaxSize(int bytes) {
- this.maxSize = bytes;
- }
-
- public synchronized int addAndGet(int val) {
- size = size + (val);
- return size;
- }
-
-
-
- public synchronized XByteBuffer getBuffer(int minSize, boolean discard) {
- XByteBuffer buffer = (XByteBuffer)(queue.size()>0?queue.remove(0):null);
- if ( buffer != null ) addAndGet(-buffer.getCapacity());
- if ( buffer == null ) buffer = new XByteBuffer(minSize,discard);
- else if ( buffer.getCapacity() <= minSize ) buffer.expand(minSize);
- buffer.setDiscard(discard);
- buffer.reset();
- return buffer;
- }
-
- public synchronized void returnBuffer(XByteBuffer buffer) {
- if ( (size + buffer.getCapacity()) <= maxSize ) {
- addAndGet(buffer.getCapacity());
- queue.add(buffer);
- }
- }
-
- public synchronized void clear() {
- queue.clear();
- size = 0;
- }
-
- public int getMaxSize() {
- return maxSize;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import java.util.Queue;
+import java.util.LinkedList;
+
+
+/**
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+class BufferPool14Impl implements BufferPool.BufferPoolAPI {
+ protected int maxSize;
+ protected int size = 0;
+ protected LinkedList queue = new LinkedList();
+
+ public void setMaxSize(int bytes) {
+ this.maxSize = bytes;
+ }
+
+ public synchronized int addAndGet(int val) {
+ size = size + (val);
+ return size;
+ }
+
+
+
+ public synchronized XByteBuffer getBuffer(int minSize, boolean discard) {
+ XByteBuffer buffer = (XByteBuffer)(queue.size()>0?queue.remove(0):null);
+ if ( buffer != null ) addAndGet(-buffer.getCapacity());
+ if ( buffer == null ) buffer = new XByteBuffer(minSize,discard);
+ else if ( buffer.getCapacity() <= minSize ) buffer.expand(minSize);
+ buffer.setDiscard(discard);
+ buffer.reset();
+ return buffer;
+ }
+
+ public synchronized void returnBuffer(XByteBuffer buffer) {
+ if ( (size + buffer.getCapacity()) <= maxSize ) {
+ addAndGet(buffer.getCapacity());
+ queue.add(buffer);
+ }
+ }
+
+ public synchronized void clear() {
+ queue.clear();
+ size = 0;
+ }
+
+ public int getMaxSize() {
+ return maxSize;
+ }
+
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org