You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 04:45:59 UTC

svn commit: r467206 [8/30] - in /tomcat: build/tc5.5.x/ connectors/trunk/ connectors/trunk/ajp/ajplib/test/ connectors/trunk/ajp/proxy/ connectors/trunk/jk/jkstatus/src/share/org/apache/jk/status/ connectors/trunk/jk/native/iis/ connectors/trunk/jk/nat...

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Mon Oct 23 19:45:46 2006
@@ -1,299 +1,299 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- */
-
-package org.apache.catalina.tribes.group.interceptors;
-
-import java.util.HashMap;
-
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.io.XByteBuffer;
-
-
-
-/**
- *
- * The order interceptor guarantees that messages are received in the same order they were 
- * sent.
- * This interceptor works best with the ack=true setting. <br>
- * There is no point in 
- * using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR>
- * If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads,
- * this interceptor can really slow you down, as many messages will be completely out of order
- * and the queue might become rather large. If this is the case, then you might want to set 
- * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
- * <br><b>Configuration Options</b><br>
- * OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it <b>default=3000ms</b><br>
- * OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering. 
- *   This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br>
- * OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to 
- * do when a message has expired or the queue has grown larger than the maxQueue value.
- * true means that the message is sent up the stack to the receiver that will receive and out of order message
- * false means, forget the message and reset the message counter. <b>default=true</b>
- * 
- * 
- * @author Filip Hanik
- * @version 1.0
- */
-public class OrderInterceptor extends ChannelInterceptorBase {
-    private HashMap outcounter = new HashMap();
-    private HashMap incounter = new HashMap();
-    private HashMap incoming = new HashMap();
-    private long expire = 3000;
-    private boolean forwardExpired = true;
-    private int maxQueue = Integer.MAX_VALUE;
-
-    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
-        for ( int i=0; i<destination.length; i++ ) {
-            int nr = incCounter(destination[i]);
-            //reduce byte copy
-            msg.getMessage().append(nr);
-            try {
-                getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
-            }finally {
-                msg.getMessage().trim(4);
-            }
-        }
-    }
-
-    public void messageReceived(ChannelMessage msg) {
-        int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
-        msg.getMessage().trim(4);
-        MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
-        if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
-    }
-    
-    public synchronized void processLeftOvers(Member member, boolean force) {
-        MessageOrder tmp = (MessageOrder)incoming.get(member);
-        if ( force ) {
-            Counter cnt = getInCounter(member);
-            cnt.setCounter(Integer.MAX_VALUE);
-        }
-        if ( tmp!= null ) processIncoming(tmp);
-    }
-    /**
-     * 
-     * @param order MessageOrder
-     * @return boolean - true if a message expired and was processed
-     */
-    public synchronized boolean processIncoming(MessageOrder order) {
-        boolean result = false;
-        Member member = order.getMessage().getAddress();
-        Counter cnt = getInCounter(member);
-        
-        MessageOrder tmp = (MessageOrder)incoming.get(member);
-        if ( tmp != null ) {
-            order = MessageOrder.add(tmp,order);
-        }
-        
-        
-        while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter())  ) {
-            //we are right on target. process orders
-            if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
-            else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());
-            super.messageReceived(order.getMessage());
-            order.setMessage(null);
-            order = order.next;
-        }
-        MessageOrder head = order;
-        MessageOrder prev = null;
-        tmp = order;
-        //flag to empty out the queue when it larger than maxQueue
-        boolean empty = order!=null?order.getCount()>=maxQueue:false;
-        while ( tmp != null ) {
-            //process expired messages or empty out the queue
-            if ( tmp.isExpired(expire) || empty ) {
-                //reset the head
-                if ( tmp == head ) head = tmp.next;
-                cnt.setCounter(tmp.getMsgNr()+1);
-                if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());
-                tmp.setMessage(null);
-                tmp = tmp.next;
-                if ( prev != null ) prev.next = tmp;  
-                result = true;
-            } else {
-                prev = tmp;
-                tmp = tmp.next;
-            }
-        }
-        if ( head == null ) incoming.remove(member);
-        else incoming.put(member, head);
-        return result;
-    }
-    
-    public void memberAdded(Member member) {
-        //notify upwards
-        getInCounter(member);
-        getOutCounter(member);
-        super.memberAdded(member);
-    }
-
-    public void memberDisappeared(Member member) {
-        //notify upwards
-        outcounter.remove(member);
-        incounter.remove(member);
-        //clear the remaining queue
-        processLeftOvers(member,true);
-        super.memberDisappeared(member);
-    }
-    
-    public int incCounter(Member mbr) { 
-        Counter cnt = getOutCounter(mbr);
-        return cnt.inc();
-    }
-    
-    public synchronized Counter getInCounter(Member mbr) {
-        Counter cnt = (Counter)incounter.get(mbr);
-        if ( cnt == null ) {
-            cnt = new Counter();
-            cnt.inc(); //always start at 1 for incoming
-            incounter.put(mbr,cnt);
-        }
-        return cnt;
-    }
-
-    public synchronized Counter getOutCounter(Member mbr) {
-        Counter cnt = (Counter)outcounter.get(mbr);
-        if ( cnt == null ) {
-            cnt = new Counter();
-            outcounter.put(mbr,cnt);
-        }
-        return cnt;
-    }
-
-    public static class Counter {
-        private int value = 0;
-        
-        public int getCounter() {
-            return value;
-        }
-        
-        public synchronized void setCounter(int counter) {
-            this.value = counter;
-        }
-        
-        public synchronized int inc() {
-            return ++value;
-        }
-    }
-    
-    public static class MessageOrder {
-        private long received = System.currentTimeMillis();
-        private MessageOrder next;
-        private int msgNr;
-        private ChannelMessage msg = null;
-        public MessageOrder(int msgNr,ChannelMessage msg) {
-            this.msgNr = msgNr;
-            this.msg = msg;
-        }
-        
-        public boolean isExpired(long expireTime) {
-            return (System.currentTimeMillis()-received) > expireTime;
-        }
-        
-        public ChannelMessage getMessage() {
-            return msg;
-        }
-        
-        public void setMessage(ChannelMessage msg) {
-            this.msg = msg;
-        }
-        
-        public void setNext(MessageOrder order) {
-            this.next = order;
-        }
-        public MessageOrder getNext() {
-            return next;
-        }
-        
-        public int getCount() {
-            int counter = 1;
-            MessageOrder tmp = next;
-            while ( tmp != null ) {
-                counter++;
-                tmp = tmp.next;
-            }
-            return counter;
-        }
-        
-        public static MessageOrder add(MessageOrder head, MessageOrder add) {
-            if ( head == null ) return add;
-            if ( add == null ) return head;
-            if ( head == add ) return add;
-
-            if ( head.getMsgNr() > add.getMsgNr() ) {
-                add.next = head;
-                return add;
-            }
-            
-            MessageOrder iter = head;
-            MessageOrder prev = null;
-            while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) {
-                prev = iter;
-                iter = iter.next;
-            }
-            if ( iter.getMsgNr() < add.getMsgNr() ) {
-                //add after
-                add.next = iter.next;
-                iter.next = add;
-            } else if (iter.getMsgNr() > add.getMsgNr()) {
-                //add before
-                prev.next = add;
-                add.next = iter;
-                
-            } else {
-                throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor");
-            }
-            
-            return head;
-        }
-        
-        public int getMsgNr() {
-            return msgNr;
-        }
-        
-        
-        
-    }
-
-    public void setExpire(long expire) {
-        this.expire = expire;
-    }
-
-    public void setForwardExpired(boolean forwardExpired) {
-        this.forwardExpired = forwardExpired;
-    }
-
-    public void setMaxQueue(int maxQueue) {
-        this.maxQueue = maxQueue;
-    }
-
-    public long getExpire() {
-        return expire;
-    }
-
-    public boolean getForwardExpired() {
-        return forwardExpired;
-    }
-
-    public int getMaxQueue() {
-        return maxQueue;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.io.XByteBuffer;
+
+
+
+/**
+ *
+ * The order interceptor guarantees that messages are received in the same order they were 
+ * sent.
+ * This interceptor works best with the ack=true setting. <br>
+ * There is no point in 
+ * using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR>
+ * If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads,
+ * this interceptor can really slow you down, as many messages will be completely out of order
+ * and the queue might become rather large. If this is the case, then you might want to set 
+ * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
+ * <br><b>Configuration Options</b><br>
+ * OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it <b>default=3000ms</b><br>
+ * OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering. 
+ *   This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br>
+ * OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to 
+ * do when a message has expired or the queue has grown larger than the maxQueue value.
+ * true means that the message is sent up the stack to the receiver that will receive and out of order message
+ * false means, forget the message and reset the message counter. <b>default=true</b>
+ * 
+ * 
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class OrderInterceptor extends ChannelInterceptorBase {
+    private HashMap outcounter = new HashMap();
+    private HashMap incounter = new HashMap();
+    private HashMap incoming = new HashMap();
+    private long expire = 3000;
+    private boolean forwardExpired = true;
+    private int maxQueue = Integer.MAX_VALUE;
+
+    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+        for ( int i=0; i<destination.length; i++ ) {
+            int nr = incCounter(destination[i]);
+            //reduce byte copy
+            msg.getMessage().append(nr);
+            try {
+                getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
+            }finally {
+                msg.getMessage().trim(4);
+            }
+        }
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+        msg.getMessage().trim(4);
+        MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
+        if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
+    }
+    
+    public synchronized void processLeftOvers(Member member, boolean force) {
+        MessageOrder tmp = (MessageOrder)incoming.get(member);
+        if ( force ) {
+            Counter cnt = getInCounter(member);
+            cnt.setCounter(Integer.MAX_VALUE);
+        }
+        if ( tmp!= null ) processIncoming(tmp);
+    }
+    /**
+     * 
+     * @param order MessageOrder
+     * @return boolean - true if a message expired and was processed
+     */
+    public synchronized boolean processIncoming(MessageOrder order) {
+        boolean result = false;
+        Member member = order.getMessage().getAddress();
+        Counter cnt = getInCounter(member);
+        
+        MessageOrder tmp = (MessageOrder)incoming.get(member);
+        if ( tmp != null ) {
+            order = MessageOrder.add(tmp,order);
+        }
+        
+        
+        while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter())  ) {
+            //we are right on target. process orders
+            if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
+            else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());
+            super.messageReceived(order.getMessage());
+            order.setMessage(null);
+            order = order.next;
+        }
+        MessageOrder head = order;
+        MessageOrder prev = null;
+        tmp = order;
+        //flag to empty out the queue when it larger than maxQueue
+        boolean empty = order!=null?order.getCount()>=maxQueue:false;
+        while ( tmp != null ) {
+            //process expired messages or empty out the queue
+            if ( tmp.isExpired(expire) || empty ) {
+                //reset the head
+                if ( tmp == head ) head = tmp.next;
+                cnt.setCounter(tmp.getMsgNr()+1);
+                if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());
+                tmp.setMessage(null);
+                tmp = tmp.next;
+                if ( prev != null ) prev.next = tmp;  
+                result = true;
+            } else {
+                prev = tmp;
+                tmp = tmp.next;
+            }
+        }
+        if ( head == null ) incoming.remove(member);
+        else incoming.put(member, head);
+        return result;
+    }
+    
+    public void memberAdded(Member member) {
+        //notify upwards
+        getInCounter(member);
+        getOutCounter(member);
+        super.memberAdded(member);
+    }
+
+    public void memberDisappeared(Member member) {
+        //notify upwards
+        outcounter.remove(member);
+        incounter.remove(member);
+        //clear the remaining queue
+        processLeftOvers(member,true);
+        super.memberDisappeared(member);
+    }
+    
+    public int incCounter(Member mbr) { 
+        Counter cnt = getOutCounter(mbr);
+        return cnt.inc();
+    }
+    
+    public synchronized Counter getInCounter(Member mbr) {
+        Counter cnt = (Counter)incounter.get(mbr);
+        if ( cnt == null ) {
+            cnt = new Counter();
+            cnt.inc(); //always start at 1 for incoming
+            incounter.put(mbr,cnt);
+        }
+        return cnt;
+    }
+
+    public synchronized Counter getOutCounter(Member mbr) {
+        Counter cnt = (Counter)outcounter.get(mbr);
+        if ( cnt == null ) {
+            cnt = new Counter();
+            outcounter.put(mbr,cnt);
+        }
+        return cnt;
+    }
+
+    public static class Counter {
+        private int value = 0;
+        
+        public int getCounter() {
+            return value;
+        }
+        
+        public synchronized void setCounter(int counter) {
+            this.value = counter;
+        }
+        
+        public synchronized int inc() {
+            return ++value;
+        }
+    }
+    
+    public static class MessageOrder {
+        private long received = System.currentTimeMillis();
+        private MessageOrder next;
+        private int msgNr;
+        private ChannelMessage msg = null;
+        public MessageOrder(int msgNr,ChannelMessage msg) {
+            this.msgNr = msgNr;
+            this.msg = msg;
+        }
+        
+        public boolean isExpired(long expireTime) {
+            return (System.currentTimeMillis()-received) > expireTime;
+        }
+        
+        public ChannelMessage getMessage() {
+            return msg;
+        }
+        
+        public void setMessage(ChannelMessage msg) {
+            this.msg = msg;
+        }
+        
+        public void setNext(MessageOrder order) {
+            this.next = order;
+        }
+        public MessageOrder getNext() {
+            return next;
+        }
+        
+        public int getCount() {
+            int counter = 1;
+            MessageOrder tmp = next;
+            while ( tmp != null ) {
+                counter++;
+                tmp = tmp.next;
+            }
+            return counter;
+        }
+        
+        public static MessageOrder add(MessageOrder head, MessageOrder add) {
+            if ( head == null ) return add;
+            if ( add == null ) return head;
+            if ( head == add ) return add;
+
+            if ( head.getMsgNr() > add.getMsgNr() ) {
+                add.next = head;
+                return add;
+            }
+            
+            MessageOrder iter = head;
+            MessageOrder prev = null;
+            while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) {
+                prev = iter;
+                iter = iter.next;
+            }
+            if ( iter.getMsgNr() < add.getMsgNr() ) {
+                //add after
+                add.next = iter.next;
+                iter.next = add;
+            } else if (iter.getMsgNr() > add.getMsgNr()) {
+                //add before
+                prev.next = add;
+                add.next = iter;
+                
+            } else {
+                throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor");
+            }
+            
+            return head;
+        }
+        
+        public int getMsgNr() {
+            return msgNr;
+        }
+        
+        
+        
+    }
+
+    public void setExpire(long expire) {
+        this.expire = expire;
+    }
+
+    public void setForwardExpired(boolean forwardExpired) {
+        this.forwardExpired = forwardExpired;
+    }
+
+    public void setMaxQueue(int maxQueue) {
+        this.maxQueue = maxQueue;
+    }
+
+    public long getExpire() {
+        return expire;
+    }
+
+    public boolean getForwardExpired() {
+        return forwardExpired;
+    }
+
+    public int getMaxQueue() {
+        return maxQueue;
+    }
+
+}

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Mon Oct 23 19:45:46 2006
@@ -1,289 +1,289 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- */
-package org.apache.catalina.tribes.group.interceptors;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.util.Arrays;
-import java.util.HashMap;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelException.FaultyMember;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.RemoteProcessException;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import org.apache.catalina.tribes.membership.Membership;
-import java.net.ConnectException;
-
-/**
- * <p>Title: A perfect failure detector </p>
- *
- * <p>Description: The TcpFailureDetector is a useful interceptor
- * that adds reliability to the membership layer.</p>
- * <p>
- * If the network is busy, or the system is busy so that the membership receiver thread
- * is not getting enough time to update its table, members can be &quot;timed out&quot;
- * This failure detector will intercept the memberDisappeared message(unless its a true shutdown message)
- * and connect to the member using TCP.
- * </p>
- * <p>
- * The TcpFailureDetector works in two ways. <br>
- * 1. It intercepts memberDisappeared events
- * 2. It catches send errors 
- * </p>
- *
- * @author Filip Hanik
- * @version 1.0
- */
-public class TcpFailureDetector extends ChannelInterceptorBase {
-    
-    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
-    
-    protected static byte[] TCP_FAIL_DETECT = new byte[] {
-        79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
-        125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
-        55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
-        85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, -64, -43};      
-    
-    protected boolean performConnectTest = true;
-
-    protected long connectTimeout = 1000;//1 second default
-    
-    protected boolean performSendTest = true;
-
-    protected boolean performReadTest = false;
-    
-    protected long readTestTimeout = 5000;//5 seconds
-    
-    protected Membership membership = null;
-    
-    protected HashMap removeSuspects = new HashMap();
-    
-    protected HashMap addSuspects = new HashMap();
-    
-    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
-        try {
-            super.sendMessage(destination, msg, payload);
-        }catch ( ChannelException cx ) {
-            FaultyMember[] mbrs = cx.getFaultyMembers();
-            for ( int i=0; i<mbrs.length; i++ ) {
-                if ( mbrs[i].getCause()!=null &&  
-                     (!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok
-                    this.memberDisappeared(mbrs[i].getMember());
-                }//end if
-            }//for
-            throw cx;
-        }
-    }
-
-    public void messageReceived(ChannelMessage msg) {
-        //catch incoming 
-        boolean process = true;
-        if ( okToProcess(msg.getOptions()) ) {
-            //check to see if it is a testMessage, if so, process = false
-            process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
-                        (!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
-        }//end if
-            
-        //ignore the message, it doesnt have the flag set
-        if ( process ) super.messageReceived(msg);
-        else if ( log.isDebugEnabled() ) log.debug("Received a failure detector packet:"+msg);
-    }//messageReceived
-    
-    
-    public void memberAdded(Member member) {
-        if ( membership == null ) setupMembership();
-        boolean notify = false;
-        synchronized (membership) {
-            if (removeSuspects.containsKey(member)) {
-                //previously marked suspect, system below picked up the member again
-                removeSuspects.remove(member);
-            } else if (membership.getMember( (MemberImpl) member) == null){
-                //if we add it here, then add it upwards too
-                //check to see if it is alive
-                if (memberAlive(member)) {
-                    membership.memberAlive( (MemberImpl) member);
-                    notify = true;
-                } else {
-                    addSuspects.put(member, new Long(System.currentTimeMillis()));
-                }
-            }
-        }
-        if ( notify ) super.memberAdded(member);
-    }
-
-    public void memberDisappeared(Member member) {
-        if ( membership == null ) setupMembership();
-        boolean notify = false;
-        boolean shutdown = Arrays.equals(member.getCommand(),Member.SHUTDOWN_PAYLOAD);
-        if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify.");
-        synchronized (membership) {
-            //check to see if the member really is gone
-            //if the payload is not a shutdown message
-            if (shutdown || !memberAlive(member)) {
-                //not correct, we need to maintain the map
-                membership.removeMember( (MemberImpl) member);
-                removeSuspects.remove(member);
-                notify = true;
-            } else {
-                //add the member as suspect
-                removeSuspects.put(member, new Long(System.currentTimeMillis()));
-            }
-        }
-        if ( notify ) {
-            log.info("Verification complete. Member disappeared["+member+"]");
-            super.memberDisappeared(member);
-        } else {
-            log.info("Verification complete. Member still alive["+member+"]");
-
-        }
-    }
-    
-    public boolean hasMembers() {
-        if ( membership == null ) setupMembership();
-        return membership.hasMembers();
-    }
-
-    public Member[] getMembers() {
-        if ( membership == null ) setupMembership();
-        return membership.getMembers();
-    }
-
-    public Member getMember(Member mbr) {
-        if ( membership == null ) setupMembership();
-        return membership.getMember(mbr);
-    }
-
-    public Member getLocalMember(boolean incAlive) {
-        return super.getLocalMember(incAlive);
-    }
-    
-    public void heartbeat() {
-        try {
-            if (membership == null) setupMembership();
-            synchronized (membership) {
-                //update all alive times
-                Member[] members = super.getMembers();
-                for (int i = 0; members != null && i < members.length; i++) {
-                    if (membership.memberAlive( (MemberImpl) members[i])) {
-                        //we don't have this one in our membership, check to see if he/she is alive
-                        if (memberAlive(members[i])) {
-                            log.warn("Member added, even though we werent notified:" + members[i]);
-                            super.memberAdded(members[i]);
-                        } else {
-                            membership.removeMember( (MemberImpl) members[i]);
-                        } //end if
-                    } //end if
-                } //for
-
-                //check suspect members if they are still alive,
-                //if not, simply issue the memberDisappeared message
-                MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
-                for (int i = 0; i < keys.length; i++) {
-                    MemberImpl m = (MemberImpl) keys[i];
-                    if (membership.getMember(m) != null && (!memberAlive(m))) {
-                        membership.removeMember(m);
-                        super.memberDisappeared(m);
-                        removeSuspects.remove(m);
-                        log.info("Suspect member, confirmed dead.["+m+"]");
-                    } //end if
-                }
-
-                //check add suspects members if they are alive now,
-                //if they are, simply issue the memberAdded message
-                keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
-                for (int i = 0; i < keys.length; i++) {
-                    MemberImpl m = (MemberImpl) keys[i];
-                    if ( membership.getMember(m) == null && (memberAlive(m))) {
-                        membership.memberAlive(m);
-                        super.memberAdded(m);
-                        addSuspects.remove(m);
-                        log.info("Suspect member, confirmed alive.["+m+"]");
-                    } //end if
-                }
-            }
-        }catch ( Exception x ) {
-            log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
-        } finally {
-            super.heartbeat();
-        }
-    }
-    
-    protected synchronized void setupMembership() {
-        if ( membership == null ) {
-            membership = new Membership((MemberImpl)super.getLocalMember(true));
-        }
-        
-    }
-    
-    protected boolean memberAlive(Member mbr) {
-        return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
-    }
-    
-    protected static boolean memberAlive(Member mbr, byte[] msgData, 
-                                         boolean sendTest, boolean readTest,
-                                         long readTimeout, long conTimeout,
-                                         int optionFlag) {
-        //could be a shutdown notification
-        if ( Arrays.equals(mbr.getCommand(),Member.SHUTDOWN_PAYLOAD) ) return false;
-        
-        Socket socket = new Socket();        
-        try {
-            InetAddress ia = InetAddress.getByAddress(mbr.getHost());
-            InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
-            socket.setSoTimeout((int)readTimeout);
-            socket.connect(addr, (int) conTimeout);
-            if ( sendTest ) {
-                ChannelData data = new ChannelData(true);
-                data.setAddress(mbr);
-                data.setMessage(new XByteBuffer(msgData,false));
-                data.setTimestamp(System.currentTimeMillis());
-                int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
-                if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
-                else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
-                data.setOptions(options);
-                byte[] message = XByteBuffer.createDataPackage(data);
-                socket.getOutputStream().write(message);
-                if ( readTest ) {
-                    int length = socket.getInputStream().read(message);
-                    return length > 0;
-                }
-            }//end if
-            return true;
-        } catch ( SocketTimeoutException sx) {
-            //do nothing, we couldn't connect
-        } catch ( ConnectException cx) {
-            //do nothing, we couldn't connect
-        }catch (Exception x ) {
-            log.error("Unable to perform failure detection check, assuming member down.",x);
-        } finally {
-            try {socket.close(); } catch ( Exception ignore ){}
-        }
-        return false;
-    }
-
-
-
-    
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelException.FaultyMember;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.membership.Membership;
+import java.net.ConnectException;
+
+/**
+ * <p>Title: A perfect failure detector </p>
+ *
+ * <p>Description: The TcpFailureDetector is a useful interceptor
+ * that adds reliability to the membership layer.</p>
+ * <p>
+ * If the network is busy, or the system is busy so that the membership receiver thread
+ * is not getting enough time to update its table, members can be &quot;timed out&quot;
+ * This failure detector will intercept the memberDisappeared message(unless its a true shutdown message)
+ * and connect to the member using TCP.
+ * </p>
+ * <p>
+ * The TcpFailureDetector works in two ways. <br>
+ * 1. It intercepts memberDisappeared events
+ * 2. It catches send errors 
+ * </p>
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class TcpFailureDetector extends ChannelInterceptorBase {
+    
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
+    
+    protected static byte[] TCP_FAIL_DETECT = new byte[] {
+        79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
+        125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
+        55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
+        85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, -64, -43};      
+    
+    protected boolean performConnectTest = true;
+
+    protected long connectTimeout = 1000;//1 second default
+    
+    protected boolean performSendTest = true;
+
+    protected boolean performReadTest = false;
+    
+    protected long readTestTimeout = 5000;//5 seconds
+    
+    protected Membership membership = null;
+    
+    protected HashMap removeSuspects = new HashMap();
+    
+    protected HashMap addSuspects = new HashMap();
+    
+    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+        try {
+            super.sendMessage(destination, msg, payload);
+        }catch ( ChannelException cx ) {
+            FaultyMember[] mbrs = cx.getFaultyMembers();
+            for ( int i=0; i<mbrs.length; i++ ) {
+                if ( mbrs[i].getCause()!=null &&  
+                     (!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok
+                    this.memberDisappeared(mbrs[i].getMember());
+                }//end if
+            }//for
+            throw cx;
+        }
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        //catch incoming 
+        boolean process = true;
+        if ( okToProcess(msg.getOptions()) ) {
+            //check to see if it is a testMessage, if so, process = false
+            process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
+                        (!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
+        }//end if
+            
+        //ignore the message, it doesnt have the flag set
+        if ( process ) super.messageReceived(msg);
+        else if ( log.isDebugEnabled() ) log.debug("Received a failure detector packet:"+msg);
+    }//messageReceived
+    
+    
+    public void memberAdded(Member member) {
+        if ( membership == null ) setupMembership();
+        boolean notify = false;
+        synchronized (membership) {
+            if (removeSuspects.containsKey(member)) {
+                //previously marked suspect, system below picked up the member again
+                removeSuspects.remove(member);
+            } else if (membership.getMember( (MemberImpl) member) == null){
+                //if we add it here, then add it upwards too
+                //check to see if it is alive
+                if (memberAlive(member)) {
+                    membership.memberAlive( (MemberImpl) member);
+                    notify = true;
+                } else {
+                    addSuspects.put(member, new Long(System.currentTimeMillis()));
+                }
+            }
+        }
+        if ( notify ) super.memberAdded(member);
+    }
+
+    public void memberDisappeared(Member member) {
+        if ( membership == null ) setupMembership();
+        boolean notify = false;
+        boolean shutdown = Arrays.equals(member.getCommand(),Member.SHUTDOWN_PAYLOAD);
+        if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify.");
+        synchronized (membership) {
+            //check to see if the member really is gone
+            //if the payload is not a shutdown message
+            if (shutdown || !memberAlive(member)) {
+                //not correct, we need to maintain the map
+                membership.removeMember( (MemberImpl) member);
+                removeSuspects.remove(member);
+                notify = true;
+            } else {
+                //add the member as suspect
+                removeSuspects.put(member, new Long(System.currentTimeMillis()));
+            }
+        }
+        if ( notify ) {
+            log.info("Verification complete. Member disappeared["+member+"]");
+            super.memberDisappeared(member);
+        } else {
+            log.info("Verification complete. Member still alive["+member+"]");
+
+        }
+    }
+    
+    public boolean hasMembers() {
+        if ( membership == null ) setupMembership();
+        return membership.hasMembers();
+    }
+
+    public Member[] getMembers() {
+        if ( membership == null ) setupMembership();
+        return membership.getMembers();
+    }
+
+    public Member getMember(Member mbr) {
+        if ( membership == null ) setupMembership();
+        return membership.getMember(mbr);
+    }
+
+    public Member getLocalMember(boolean incAlive) {
+        return super.getLocalMember(incAlive);
+    }
+    
+    public void heartbeat() {
+        try {
+            if (membership == null) setupMembership();
+            synchronized (membership) {
+                //update all alive times
+                Member[] members = super.getMembers();
+                for (int i = 0; members != null && i < members.length; i++) {
+                    if (membership.memberAlive( (MemberImpl) members[i])) {
+                        //we don't have this one in our membership, check to see if he/she is alive
+                        if (memberAlive(members[i])) {
+                            log.warn("Member added, even though we werent notified:" + members[i]);
+                            super.memberAdded(members[i]);
+                        } else {
+                            membership.removeMember( (MemberImpl) members[i]);
+                        } //end if
+                    } //end if
+                } //for
+
+                //check suspect members if they are still alive,
+                //if not, simply issue the memberDisappeared message
+                MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
+                for (int i = 0; i < keys.length; i++) {
+                    MemberImpl m = (MemberImpl) keys[i];
+                    if (membership.getMember(m) != null && (!memberAlive(m))) {
+                        membership.removeMember(m);
+                        super.memberDisappeared(m);
+                        removeSuspects.remove(m);
+                        log.info("Suspect member, confirmed dead.["+m+"]");
+                    } //end if
+                }
+
+                //check add suspects members if they are alive now,
+                //if they are, simply issue the memberAdded message
+                keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
+                for (int i = 0; i < keys.length; i++) {
+                    MemberImpl m = (MemberImpl) keys[i];
+                    if ( membership.getMember(m) == null && (memberAlive(m))) {
+                        membership.memberAlive(m);
+                        super.memberAdded(m);
+                        addSuspects.remove(m);
+                        log.info("Suspect member, confirmed alive.["+m+"]");
+                    } //end if
+                }
+            }
+        }catch ( Exception x ) {
+            log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
+        } finally {
+            super.heartbeat();
+        }
+    }
+    
+    protected synchronized void setupMembership() {
+        if ( membership == null ) {
+            membership = new Membership((MemberImpl)super.getLocalMember(true));
+        }
+        
+    }
+    
+    protected boolean memberAlive(Member mbr) {
+        return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
+    }
+    
+    protected static boolean memberAlive(Member mbr, byte[] msgData, 
+                                         boolean sendTest, boolean readTest,
+                                         long readTimeout, long conTimeout,
+                                         int optionFlag) {
+        //could be a shutdown notification
+        if ( Arrays.equals(mbr.getCommand(),Member.SHUTDOWN_PAYLOAD) ) return false;
+        
+        Socket socket = new Socket();        
+        try {
+            InetAddress ia = InetAddress.getByAddress(mbr.getHost());
+            InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
+            socket.setSoTimeout((int)readTimeout);
+            socket.connect(addr, (int) conTimeout);
+            if ( sendTest ) {
+                ChannelData data = new ChannelData(true);
+                data.setAddress(mbr);
+                data.setMessage(new XByteBuffer(msgData,false));
+                data.setTimestamp(System.currentTimeMillis());
+                int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
+                if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
+                else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
+                data.setOptions(options);
+                byte[] message = XByteBuffer.createDataPackage(data);
+                socket.getOutputStream().write(message);
+                if ( readTest ) {
+                    int length = socket.getInputStream().read(message);
+                    return length > 0;
+                }
+            }//end if
+            return true;
+        } catch ( SocketTimeoutException sx) {
+            //do nothing, we couldn't connect
+        } catch ( ConnectException cx) {
+            //do nothing, we couldn't connect
+        }catch (Exception x ) {
+            log.error("Unable to perform failure detection check, assuming member down.",x);
+        } finally {
+            try {socket.close(); } catch ( Exception ignore ){}
+        }
+        return false;
+    }
+
+
+
+    
 }

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java Mon Oct 23 19:45:46 2006
@@ -1,120 +1,120 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- */
-
-package org.apache.catalina.tribes.group.interceptors;
-
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import java.text.DecimalFormat;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-
-/**
- *
- *
- * @author Filip Hanik
- * @version 1.0
- */
-public class ThroughputInterceptor extends ChannelInterceptorBase {
-    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThroughputInterceptor.class);
-
-    double mbTx = 0;
-    double mbAppTx = 0;
-    double mbRx = 0;
-    double timeTx = 0;
-    double lastCnt = 0;
-    AtomicLong msgTxCnt = new AtomicLong(1);
-    AtomicLong msgRxCnt = new AtomicLong(0);
-    AtomicLong msgTxErr = new AtomicLong(0);
-    int interval = 10000;
-    AtomicInteger access = new AtomicInteger(0);
-    long txStart = 0;
-    long rxStart = 0;
-    DecimalFormat df = new DecimalFormat("#0.00");
-
-
-    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
-        if ( access.addAndGet(1) == 1 ) txStart = System.currentTimeMillis();
-        long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
-        try {
-            super.sendMessage(destination, msg, payload);
-        }catch ( ChannelException x ) {
-            msgTxErr.addAndGet(1);
-            access.addAndGet(-1);
-            throw x;
-        } 
-        mbTx += ((double)(bytes*destination.length))/(1024d*1024d);
-        mbAppTx += ((double)(bytes))/(1024d*1024d);
-        if ( access.addAndGet(-1) == 0 ) {
-            long stop = System.currentTimeMillis();
-            timeTx += ( (double) (stop - txStart)) / 1000d;
-            if ((msgTxCnt.get() / interval) >= lastCnt) {
-                lastCnt++;
-                report(timeTx);
-            }
-        }
-        msgTxCnt.addAndGet(1);
-    }
-
-    public void messageReceived(ChannelMessage msg) {
-        if ( rxStart == 0 ) rxStart = System.currentTimeMillis();
-        long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
-        mbRx += ((double)bytes)/(1024d*1024d);
-        msgRxCnt.addAndGet(1);
-        if ( msgRxCnt.get() % interval == 0 ) report(timeTx);
-        super.messageReceived(msg);
-        
-    }
-    
-    public void report(double timeTx) {
-        StringBuffer buf = new StringBuffer("ThroughputInterceptor Report[\n\tTx Msg:");
-        buf.append(msgTxCnt).append(" messages\n\tSent:");
-        buf.append(df.format(mbTx));
-        buf.append(" MB (total)\n\tSent:");
-        buf.append(df.format(mbAppTx));
-        buf.append(" MB (application)\n\tTime:");
-        buf.append(df.format(timeTx));
-        buf.append(" seconds\n\tTx Speed:");
-        buf.append(df.format(mbTx/timeTx));
-        buf.append(" MB/sec (total)\n\tTxSpeed:");
-        buf.append(df.format(mbAppTx/timeTx));
-        buf.append(" MB/sec (application)\n\tError Msg:");
-        buf.append(msgTxErr).append("\n\tRx Msg:");
-        buf.append(msgRxCnt);
-        buf.append(" messages\n\tRx Speed:");
-        buf.append(df.format(mbRx/((double)((System.currentTimeMillis()-rxStart)/1000))));
-        buf.append(" MB/sec (since 1st msg)\n\tReceived:");
-        buf.append(df.format(mbRx)).append(" MB]\n");
-        if ( log.isInfoEnabled() ) log.info(buf);
-    }
-    
-    public void setInterval(int interval) {
-        this.interval = interval;
-    }
-
-    public int getInterval() {
-        return interval;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.catalina.tribes.group.interceptors;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import java.text.DecimalFormat;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+
+/**
+ *
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class ThroughputInterceptor extends ChannelInterceptorBase {
+    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThroughputInterceptor.class);
+
+    double mbTx = 0;
+    double mbAppTx = 0;
+    double mbRx = 0;
+    double timeTx = 0;
+    double lastCnt = 0;
+    AtomicLong msgTxCnt = new AtomicLong(1);
+    AtomicLong msgRxCnt = new AtomicLong(0);
+    AtomicLong msgTxErr = new AtomicLong(0);
+    int interval = 10000;
+    AtomicInteger access = new AtomicInteger(0);
+    long txStart = 0;
+    long rxStart = 0;
+    DecimalFormat df = new DecimalFormat("#0.00");
+
+
+    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+        if ( access.addAndGet(1) == 1 ) txStart = System.currentTimeMillis();
+        long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
+        try {
+            super.sendMessage(destination, msg, payload);
+        }catch ( ChannelException x ) {
+            msgTxErr.addAndGet(1);
+            access.addAndGet(-1);
+            throw x;
+        } 
+        mbTx += ((double)(bytes*destination.length))/(1024d*1024d);
+        mbAppTx += ((double)(bytes))/(1024d*1024d);
+        if ( access.addAndGet(-1) == 0 ) {
+            long stop = System.currentTimeMillis();
+            timeTx += ( (double) (stop - txStart)) / 1000d;
+            if ((msgTxCnt.get() / interval) >= lastCnt) {
+                lastCnt++;
+                report(timeTx);
+            }
+        }
+        msgTxCnt.addAndGet(1);
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        if ( rxStart == 0 ) rxStart = System.currentTimeMillis();
+        long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength());
+        mbRx += ((double)bytes)/(1024d*1024d);
+        msgRxCnt.addAndGet(1);
+        if ( msgRxCnt.get() % interval == 0 ) report(timeTx);
+        super.messageReceived(msg);
+        
+    }
+    
+    public void report(double timeTx) {
+        StringBuffer buf = new StringBuffer("ThroughputInterceptor Report[\n\tTx Msg:");
+        buf.append(msgTxCnt).append(" messages\n\tSent:");
+        buf.append(df.format(mbTx));
+        buf.append(" MB (total)\n\tSent:");
+        buf.append(df.format(mbAppTx));
+        buf.append(" MB (application)\n\tTime:");
+        buf.append(df.format(timeTx));
+        buf.append(" seconds\n\tTx Speed:");
+        buf.append(df.format(mbTx/timeTx));
+        buf.append(" MB/sec (total)\n\tTxSpeed:");
+        buf.append(df.format(mbAppTx/timeTx));
+        buf.append(" MB/sec (application)\n\tError Msg:");
+        buf.append(msgTxErr).append("\n\tRx Msg:");
+        buf.append(msgRxCnt);
+        buf.append(" messages\n\tRx Speed:");
+        buf.append(df.format(mbRx/((double)((System.currentTimeMillis()-rxStart)/1000))));
+        buf.append(" MB/sec (since 1st msg)\n\tReceived:");
+        buf.append(df.format(mbRx)).append(" MB]\n");
+        if ( log.isInfoEnabled() ) log.info(buf);
+    }
+    
+    public void setInterval(int interval) {
+        this.interval = interval;
+    }
+
+    public int getInterval() {
+        return interval;
+    }
+
+}

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java Mon Oct 23 19:45:46 2006
@@ -1,149 +1,149 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.group.interceptors;
-
-import java.util.HashMap;
-
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-import org.apache.catalina.tribes.util.Arrays;
-import org.apache.catalina.tribes.UniqueId;
-import java.util.Map;
-
-/**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Copyright: Copyright (c) 2005</p>
- *
- * <p>Company: </p>
- *
- * @author not attributable
- * @version 1.0
- */
-public class TwoPhaseCommitInterceptor extends ChannelInterceptorBase {
-
-    public static final byte[] START_DATA = new byte[] {113, 1, -58, 2, -34, -60, 75, -78, -101, -12, 32, -29, 32, 111, -40, 4};
-    public static final byte[] END_DATA = new byte[] {54, -13, 90, 110, 47, -31, 75, -24, -81, -29, 36, 52, -58, 77, -110, 56};
-    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(TwoPhaseCommitInterceptor.class);
-
-    protected HashMap messages = new HashMap();
-    protected long expire = 1000 * 60; //one minute expiration
-    protected boolean deepclone = true;
-
-    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws
-        ChannelException {
-        //todo, optimize, if destination.length==1, then we can do
-        //msg.setOptions(msg.getOptions() & (~getOptionFlag())
-        //and just send one message
-        if (okToProcess(msg.getOptions()) ) {
-            super.sendMessage(destination, msg, null);
-            ChannelMessage confirmation = null;
-            if ( deepclone ) confirmation = (ChannelMessage)msg.deepclone();
-            else confirmation = (ChannelMessage)msg.clone();
-            confirmation.getMessage().reset();
-            UUIDGenerator.randomUUID(false,confirmation.getUniqueId(),0);
-            confirmation.getMessage().append(START_DATA,0,START_DATA.length);
-            confirmation.getMessage().append(msg.getUniqueId(),0,msg.getUniqueId().length);
-            confirmation.getMessage().append(END_DATA,0,END_DATA.length);
-            super.sendMessage(destination,confirmation,payload);
-        } else {
-            //turn off two phase commit
-            //this wont work if the interceptor has 0 as a flag
-            //since there is no flag to turn off
-            //msg.setOptions(msg.getOptions() & (~getOptionFlag()));
-            super.sendMessage(destination, msg, payload);
-        }
-    }
-
-    public void messageReceived(ChannelMessage msg) {
-        if (okToProcess(msg.getOptions())) {
-            if ( msg.getMessage().getLength() == (START_DATA.length+msg.getUniqueId().length+END_DATA.length) &&
-                 Arrays.contains(msg.getMessage().getBytesDirect(),0,START_DATA,0,START_DATA.length) &&
-                 Arrays.contains(msg.getMessage().getBytesDirect(),START_DATA.length+msg.getUniqueId().length,END_DATA,0,END_DATA.length) ) {
-                UniqueId id = new UniqueId(msg.getMessage().getBytesDirect(),START_DATA.length,msg.getUniqueId().length);
-                MapEntry original = (MapEntry)messages.get(id);
-                if ( original != null ) {
-                    super.messageReceived(original.msg);
-                    messages.remove(id);
-                } else log.warn("Received a confirmation, but original message is missing. Id:"+Arrays.toString(id.getBytes()));
-            } else {
-                UniqueId id = new UniqueId(msg.getUniqueId());
-                MapEntry entry = new MapEntry((ChannelMessage)msg.deepclone(),id,System.currentTimeMillis());
-                messages.put(id,entry);
-            }
-        } else {
-            super.messageReceived(msg);
-        }
-    }
-
-    public boolean getDeepclone() {
-        return deepclone;
-    }
-
-    public long getExpire() {
-        return expire;
-    }
-
-    public void setDeepclone(boolean deepclone) {
-        this.deepclone = deepclone;
-    }
-
-    public void setExpire(long expire) {
-        this.expire = expire;
-    }
-    
-    public void heartbeat() {
-        try {
-            long now = System.currentTimeMillis();
-            Map.Entry[] entries = (Map.Entry[])messages.entrySet().toArray(new Map.Entry[messages.size()]);
-            for (int i=0; i<entries.length; i++ ) {
-                MapEntry entry = (MapEntry)entries[i].getValue();
-                if ( entry.expired(now,expire) ) {
-                    log.info("Message ["+entry.id+"] has expired. Removing.");
-                    messages.remove(entry.id);
-                }//end if
-            }
-        } catch ( Exception x ) {
-            log.warn("Unable to perform heartbeat on the TwoPhaseCommit interceptor.",x);
-        } finally {
-            super.heartbeat();
-        }
-    }
-    
-    public static class MapEntry {
-        public ChannelMessage msg;
-        public UniqueId id;
-        public long timestamp;
-        
-        public MapEntry(ChannelMessage msg, UniqueId id, long timestamp) {
-            this.msg = msg;
-            this.id = id;
-            this.timestamp = timestamp;
-        }
-        public boolean expired(long now, long expiration) {
-            return (now - timestamp ) > expiration;
-        }
-
-    }
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.UniqueId;
+import java.util.Map;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TwoPhaseCommitInterceptor extends ChannelInterceptorBase {
+
+    public static final byte[] START_DATA = new byte[] {113, 1, -58, 2, -34, -60, 75, -78, -101, -12, 32, -29, 32, 111, -40, 4};
+    public static final byte[] END_DATA = new byte[] {54, -13, 90, 110, 47, -31, 75, -24, -81, -29, 36, 52, -58, 77, -110, 56};
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(TwoPhaseCommitInterceptor.class);
+
+    protected HashMap messages = new HashMap();
+    protected long expire = 1000 * 60; //one minute expiration
+    protected boolean deepclone = true;
+
+    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws
+        ChannelException {
+        //todo, optimize, if destination.length==1, then we can do
+        //msg.setOptions(msg.getOptions() & (~getOptionFlag())
+        //and just send one message
+        if (okToProcess(msg.getOptions()) ) {
+            super.sendMessage(destination, msg, null);
+            ChannelMessage confirmation = null;
+            if ( deepclone ) confirmation = (ChannelMessage)msg.deepclone();
+            else confirmation = (ChannelMessage)msg.clone();
+            confirmation.getMessage().reset();
+            UUIDGenerator.randomUUID(false,confirmation.getUniqueId(),0);
+            confirmation.getMessage().append(START_DATA,0,START_DATA.length);
+            confirmation.getMessage().append(msg.getUniqueId(),0,msg.getUniqueId().length);
+            confirmation.getMessage().append(END_DATA,0,END_DATA.length);
+            super.sendMessage(destination,confirmation,payload);
+        } else {
+            //turn off two phase commit
+            //this wont work if the interceptor has 0 as a flag
+            //since there is no flag to turn off
+            //msg.setOptions(msg.getOptions() & (~getOptionFlag()));
+            super.sendMessage(destination, msg, payload);
+        }
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        if (okToProcess(msg.getOptions())) {
+            if ( msg.getMessage().getLength() == (START_DATA.length+msg.getUniqueId().length+END_DATA.length) &&
+                 Arrays.contains(msg.getMessage().getBytesDirect(),0,START_DATA,0,START_DATA.length) &&
+                 Arrays.contains(msg.getMessage().getBytesDirect(),START_DATA.length+msg.getUniqueId().length,END_DATA,0,END_DATA.length) ) {
+                UniqueId id = new UniqueId(msg.getMessage().getBytesDirect(),START_DATA.length,msg.getUniqueId().length);
+                MapEntry original = (MapEntry)messages.get(id);
+                if ( original != null ) {
+                    super.messageReceived(original.msg);
+                    messages.remove(id);
+                } else log.warn("Received a confirmation, but original message is missing. Id:"+Arrays.toString(id.getBytes()));
+            } else {
+                UniqueId id = new UniqueId(msg.getUniqueId());
+                MapEntry entry = new MapEntry((ChannelMessage)msg.deepclone(),id,System.currentTimeMillis());
+                messages.put(id,entry);
+            }
+        } else {
+            super.messageReceived(msg);
+        }
+    }
+
+    public boolean getDeepclone() {
+        return deepclone;
+    }
+
+    public long getExpire() {
+        return expire;
+    }
+
+    public void setDeepclone(boolean deepclone) {
+        this.deepclone = deepclone;
+    }
+
+    public void setExpire(long expire) {
+        this.expire = expire;
+    }
+    
+    public void heartbeat() {
+        try {
+            long now = System.currentTimeMillis();
+            Map.Entry[] entries = (Map.Entry[])messages.entrySet().toArray(new Map.Entry[messages.size()]);
+            for (int i=0; i<entries.length; i++ ) {
+                MapEntry entry = (MapEntry)entries[i].getValue();
+                if ( entry.expired(now,expire) ) {
+                    log.info("Message ["+entry.id+"] has expired. Removing.");
+                    messages.remove(entry.id);
+                }//end if
+            }
+        } catch ( Exception x ) {
+            log.warn("Unable to perform heartbeat on the TwoPhaseCommit interceptor.",x);
+        } finally {
+            super.heartbeat();
+        }
+    }
+    
+    public static class MapEntry {
+        public ChannelMessage msg;
+        public UniqueId id;
+        public long timestamp;
+        
+        public MapEntry(ChannelMessage msg, UniqueId id, long timestamp) {
+            this.msg = msg;
+            this.id = id;
+            this.timestamp = timestamp;
+        }
+        public boolean expired(long now, long expiration) {
+            return (now - timestamp ) > expiration;
+        }
+
+    }
+
 }

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java Mon Oct 23 19:45:46 2006
@@ -1,94 +1,94 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.io;
-
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.commons.logging.Log;
-
-/**
- *
- * @author Filip Hanik
- * 
- * @version 1.0
- */
-public class BufferPool {
-    protected static Log log = LogFactory.getLog(BufferPool.class);
-
-    public static int DEFAULT_POOL_SIZE = 100*1024*1024; //100MB
-
-
-
-    protected static BufferPool instance = null;
-    protected BufferPoolAPI pool = null;
-
-    private BufferPool(BufferPoolAPI pool) {
-        this.pool = pool;
-    }
-
-    public XByteBuffer getBuffer(int minSize, boolean discard) {
-        if ( pool != null ) return pool.getBuffer(minSize, discard);
-        else return new XByteBuffer(minSize,discard);
-    }
-
-    public void returnBuffer(XByteBuffer buffer) {
-        if ( pool != null ) pool.returnBuffer(buffer);
-    }
-
-    public void clear() {
-        if ( pool != null ) pool.clear();
-    }
-
-
-    public static BufferPool getBufferPool() {
-        if (  (instance == null) ) {
-            synchronized (BufferPool.class) {
-                if ( instance == null ) {
-                   BufferPoolAPI pool = null;
-                   Class clazz = null;
-                   try {
-                       clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool15Impl");
-                       pool = (BufferPoolAPI)clazz.newInstance();
-                   } catch ( Throwable x ) {
-                       try {
-                           clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool14Impl");
-                           pool = (BufferPoolAPI)clazz.newInstance();
-                       } catch ( Throwable e ) {
-                           log.warn("Unable to initilize BufferPool, not pooling XByteBuffer objects:"+x.getMessage());
-                           if ( log.isDebugEnabled() ) log.debug("Unable to initilize BufferPool, not pooling XByteBuffer objects:",x);
-                       }
-                   }
-                   pool.setMaxSize(DEFAULT_POOL_SIZE);
-                   log.info("Created a buffer pool with max size:"+DEFAULT_POOL_SIZE+" bytes of type:"+(clazz!=null?clazz.getName():"null"));
-                   instance = new BufferPool(pool);
-                }//end if
-            }//sync
-        }//end if
-        return instance;
-    }
-
-
-    public static interface BufferPoolAPI {
-        public void setMaxSize(int bytes);
-
-        public XByteBuffer getBuffer(int minSize, boolean discard);
-
-        public void returnBuffer(XByteBuffer buffer);
-
-        public void clear();
-    }    
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.logging.Log;
+
+/**
+ *
+ * @author Filip Hanik
+ * 
+ * @version 1.0
+ */
+public class BufferPool {
+    protected static Log log = LogFactory.getLog(BufferPool.class);
+
+    public static int DEFAULT_POOL_SIZE = 100*1024*1024; //100MB
+
+
+
+    protected static BufferPool instance = null;
+    protected BufferPoolAPI pool = null;
+
+    private BufferPool(BufferPoolAPI pool) {
+        this.pool = pool;
+    }
+
+    public XByteBuffer getBuffer(int minSize, boolean discard) {
+        if ( pool != null ) return pool.getBuffer(minSize, discard);
+        else return new XByteBuffer(minSize,discard);
+    }
+
+    public void returnBuffer(XByteBuffer buffer) {
+        if ( pool != null ) pool.returnBuffer(buffer);
+    }
+
+    public void clear() {
+        if ( pool != null ) pool.clear();
+    }
+
+
+    public static BufferPool getBufferPool() {
+        if (  (instance == null) ) {
+            synchronized (BufferPool.class) {
+                if ( instance == null ) {
+                   BufferPoolAPI pool = null;
+                   Class clazz = null;
+                   try {
+                       clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool15Impl");
+                       pool = (BufferPoolAPI)clazz.newInstance();
+                   } catch ( Throwable x ) {
+                       try {
+                           clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool14Impl");
+                           pool = (BufferPoolAPI)clazz.newInstance();
+                       } catch ( Throwable e ) {
+                           log.warn("Unable to initilize BufferPool, not pooling XByteBuffer objects:"+x.getMessage());
+                           if ( log.isDebugEnabled() ) log.debug("Unable to initilize BufferPool, not pooling XByteBuffer objects:",x);
+                       }
+                   }
+                   pool.setMaxSize(DEFAULT_POOL_SIZE);
+                   log.info("Created a buffer pool with max size:"+DEFAULT_POOL_SIZE+" bytes of type:"+(clazz!=null?clazz.getName():"null"));
+                   instance = new BufferPool(pool);
+                }//end if
+            }//sync
+        }//end if
+        return instance;
+    }
+
+
+    public static interface BufferPoolAPI {
+        public void setMaxSize(int bytes);
+
+        public XByteBuffer getBuffer(int minSize, boolean discard);
+
+        public void returnBuffer(XByteBuffer buffer);
+
+        public void clear();
+    }    
+}

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java Mon Oct 23 19:45:46 2006
@@ -1,70 +1,70 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.io;
-
-import java.util.Queue;
-import java.util.LinkedList;
-
-
-/**
- *
- * @author Filip Hanik
- * @version 1.0
- */
-class BufferPool14Impl implements BufferPool.BufferPoolAPI {
-    protected int maxSize;
-    protected int size = 0;
-    protected LinkedList queue = new LinkedList();
-
-    public void setMaxSize(int bytes) {
-        this.maxSize = bytes;
-    }
-    
-    public synchronized int addAndGet(int val) {
-        size = size + (val);
-        return size;
-    }
-    
-    
-
-    public synchronized XByteBuffer getBuffer(int minSize, boolean discard) {
-        XByteBuffer buffer = (XByteBuffer)(queue.size()>0?queue.remove(0):null);
-        if ( buffer != null ) addAndGet(-buffer.getCapacity());
-        if ( buffer == null ) buffer = new XByteBuffer(minSize,discard);
-        else if ( buffer.getCapacity() <= minSize ) buffer.expand(minSize);
-        buffer.setDiscard(discard);
-        buffer.reset();
-        return buffer;
-    }
-
-    public synchronized void returnBuffer(XByteBuffer buffer) {
-        if ( (size + buffer.getCapacity()) <= maxSize ) {
-            addAndGet(buffer.getCapacity());
-            queue.add(buffer);
-        }
-    }
-
-    public synchronized void clear() {
-        queue.clear();
-        size = 0;
-    }
-
-    public int getMaxSize() {
-        return maxSize;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import java.util.Queue;
+import java.util.LinkedList;
+
+
+/**
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+class BufferPool14Impl implements BufferPool.BufferPoolAPI {
+    protected int maxSize;
+    protected int size = 0;
+    protected LinkedList queue = new LinkedList();
+
+    public void setMaxSize(int bytes) {
+        this.maxSize = bytes;
+    }
+    
+    public synchronized int addAndGet(int val) {
+        size = size + (val);
+        return size;
+    }
+    
+    
+
+    public synchronized XByteBuffer getBuffer(int minSize, boolean discard) {
+        XByteBuffer buffer = (XByteBuffer)(queue.size()>0?queue.remove(0):null);
+        if ( buffer != null ) addAndGet(-buffer.getCapacity());
+        if ( buffer == null ) buffer = new XByteBuffer(minSize,discard);
+        else if ( buffer.getCapacity() <= minSize ) buffer.expand(minSize);
+        buffer.setDiscard(discard);
+        buffer.reset();
+        return buffer;
+    }
+
+    public synchronized void returnBuffer(XByteBuffer buffer) {
+        if ( (size + buffer.getCapacity()) <= maxSize ) {
+            addAndGet(buffer.getCapacity());
+            queue.add(buffer);
+        }
+    }
+
+    public synchronized void clear() {
+        queue.clear();
+        size = 0;
+    }
+
+    public int getMaxSize() {
+        return maxSize;
+    }
+
+}

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool14Impl.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org