You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 04:45:59 UTC
svn commit: r467206 [10/30] - in /tomcat: build/tc5.5.x/ connectors/trunk/
connectors/trunk/ajp/ajplib/test/ connectors/trunk/ajp/proxy/
connectors/trunk/jk/jkstatus/src/share/org/apache/jk/status/
connectors/trunk/jk/native/iis/ connectors/trunk/jk/na...
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Mon Oct 23 19:45:46 2006
@@ -1,612 +1,612 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-
-/**
- * The XByteBuffer provides a dual functionality.
- * One, it stores message bytes and automatically extends the byte buffer if needed.<BR>
- * Two, it can encode and decode packages so that they can be defined and identified
- * as they come in on a socket.
- * <br>
- * <b>THIS CLASS IS NOT THREAD SAFE</B><BR>
- * <br/>
- * Transfer package:
- * <ul>
- * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>
- * <li><b>SIZE</b> - 4 bytes - size of the data package</li>
- * <li><b>DATA</b> - should be as many bytes as the prev SIZE</li>
- * <li><b>END_DATA</b> - 7 bytes - <i>TLF2003</i></lI>
- * </ul>
- * @author Filip Hanik
- * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $
- */
-public class XByteBuffer
-{
-
- public static org.apache.commons.logging.Log log =
- org.apache.commons.logging.LogFactory.getLog( XByteBuffer.class );
-
- /**
- * This is a package header, 7 bytes (FLT2002)
- */
- public static final byte[] START_DATA = {70,76,84,50,48,48,50};
-
- /**
- * This is the package footer, 7 bytes (TLF2003)
- */
- public static final byte[] END_DATA = {84,76,70,50,48,48,51};
-
- /**
- * Default size on the initial byte buffer
- */
- private static final int DEF_SIZE = 2048;
-
- /**
- * Default size to extend the buffer with
- */
- private static final int DEF_EXT = 1024;
-
- /**
- * Variable to hold the data
- */
- protected byte[] buf = null;
-
- /**
- * Current length of data in the buffer
- */
- protected int bufSize = 0;
-
- /**
- * Flag for discarding invalid packages
- * If this flag is set to true, and append(byte[],...) is called,
- * the data added will be inspected, and if it doesn't start with
- * <code>START_DATA</code> it will be thrown away.
- *
- */
- protected boolean discard = true;
-
- /**
- * Constructs a new XByteBuffer
- * @param size - the initial size of the byte buffer
- * @todo use a pool of byte[] for performance
- */
- public XByteBuffer(int size, boolean discard) {
- buf = new byte[size];
- this.discard = discard;
- }
-
- public XByteBuffer(byte[] data,boolean discard) {
- this(data,data.length+128,discard);
- }
-
- public XByteBuffer(byte[] data, int size,boolean discard) {
- int length = Math.max(data.length,size);
- buf = new byte[length];
- System.arraycopy(data,0,buf,0,data.length);
- bufSize = data.length;
- this.discard = discard;
- }
-
- public int getLength() {
- return bufSize;
- }
-
- public void setLength(int size) {
- if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer.");
- bufSize = size;
- }
-
- public void trim(int length) {
- if ( (bufSize - length) < 0 )
- throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length);
- bufSize -= length;
- }
-
- public void reset() {
- bufSize = 0;
- }
-
- public byte[] getBytesDirect() {
- return this.buf;
- }
-
- /**
- * Returns the bytes in the buffer, in its exact length
- */
- public byte[] getBytes() {
- byte[] b = new byte[bufSize];
- System.arraycopy(buf,0,b,0,bufSize);
- return b;
- }
-
- /**
- * Resets the buffer
- */
- public void clear() {
- bufSize = 0;
- }
-
- /**
- * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the
- * header, false will be returned and the data will be discarded.
- * @param b - bytes to be appended
- * @param off - the offset to extract data from
- * @param len - the number of bytes to append.
- * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0
- */
- public boolean append(ByteBuffer b, int len) {
- int newcount = bufSize + len;
- if (newcount > buf.length) {
- expand(newcount);
- }
- b.get(buf,bufSize,len);
-
- bufSize = newcount;
-
- if ( discard ) {
- if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
- bufSize = 0;
- log.error("Discarded the package, invalid header");
- return false;
- }
- }
- return true;
-
- }
-
- public boolean append(byte i) {
- int newcount = bufSize + 1;
- if (newcount > buf.length) {
- expand(newcount);
- }
- buf[bufSize] = i;
- bufSize = newcount;
- return true;
- }
-
-
- public boolean append(boolean i) {
- int newcount = bufSize + 1;
- if (newcount > buf.length) {
- expand(newcount);
- }
- XByteBuffer.toBytes(i,buf,bufSize);
- bufSize = newcount;
- return true;
- }
-
- public boolean append(long i) {
- int newcount = bufSize + 8;
- if (newcount > buf.length) {
- expand(newcount);
- }
- XByteBuffer.toBytes(i,buf,bufSize);
- bufSize = newcount;
- return true;
- }
-
- public boolean append(int i) {
- int newcount = bufSize + 4;
- if (newcount > buf.length) {
- expand(newcount);
- }
- XByteBuffer.toBytes(i,buf,bufSize);
- bufSize = newcount;
- return true;
- }
-
- public boolean append(byte[] b, int off, int len) {
- if ((off < 0) || (off > b.length) || (len < 0) ||
- ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return false;
- }
-
- int newcount = bufSize + len;
- if (newcount > buf.length) {
- expand(newcount);
- }
- System.arraycopy(b, off, buf, bufSize, len);
- bufSize = newcount;
-
- if ( discard ) {
- if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
- bufSize = 0;
- log.error("Discarded the package, invalid header");
- return false;
- }
- }
- return true;
- }
-
- public void expand(int newcount) {
- //don't change the allocation strategy
- byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
- System.arraycopy(buf, 0, newbuf, 0, bufSize);
- buf = newbuf;
- }
-
- public int getCapacity() {
- return buf.length;
- }
-
-
- /**
- * Internal mechanism to make a check if a complete package exists
- * within the buffer
- * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer
- */
- public int countPackages() {
- return countPackages(false);
- }
-
- public int countPackages(boolean first)
- {
- int cnt = 0;
- int pos = START_DATA.length;
- int start = 0;
-
- while ( start < bufSize ) {
- //first check start header
- int index = XByteBuffer.firstIndexOf(buf,start,START_DATA);
- //if the header (START_DATA) isn't the first thing or
- //the buffer isn't even 14 bytes
- if ( index != start || ((bufSize-start)<14) ) break;
- //next 4 bytes are compress flag not needed for count packages
- //then get the size 4 bytes
- int size = toInt(buf, pos);
- //now the total buffer has to be long enough to hold
- //START_DATA.length+4+size+END_DATA.length
- pos = start + START_DATA.length + 4 + size;
- if ( (pos + END_DATA.length) > bufSize) break;
- //and finally check the footer of the package END_DATA
- int newpos = firstIndexOf(buf, pos, END_DATA);
- //mismatch, there is no package
- if (newpos != pos) break;
- //increase the packet count
- cnt++;
- //reset the values
- start = pos + END_DATA.length;
- pos = start + START_DATA.length;
- //we only want to verify that we have at least one package
- if ( first ) break;
- }
- return cnt;
- }
-
- /**
- * Method to check if a package exists in this byte buffer.
- * @return - true if a complete package (header,options,size,data,footer) exists within the buffer
- */
- public boolean doesPackageExist() {
- return (countPackages(true)>0);
- }
-
- /**
- * Extracts the message bytes from a package.
- * If no package exists, a IllegalStateException will be thrown.
- * @param clearFromBuffer - if true, the package will be removed from the byte buffer
- * @return - returns the actual message bytes (header, compress,size and footer not included).
- */
- public XByteBuffer extractDataPackage(boolean clearFromBuffer) {
- int psize = countPackages(true);
- if (psize == 0) {
- throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
- }
- int size = toInt(buf, START_DATA.length);
- XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);
- xbuf.setLength(size);
- System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);
- if (clearFromBuffer) {
- int totalsize = START_DATA.length + 4 + size + END_DATA.length;
- bufSize = bufSize - totalsize;
- System.arraycopy(buf, totalsize, buf, 0, bufSize);
- }
- return xbuf;
-
- }
-
- public ChannelData extractPackage(boolean clearFromBuffer) throws java.io.IOException {
- XByteBuffer xbuf = extractDataPackage(clearFromBuffer);
- ChannelData cdata = ChannelData.getDataFromPackage(xbuf);
- return cdata;
- }
-
- /**
- * Creates a complete data package
- * @param indata - the message data to be contained within the package
- * @param compressed - compression flag for the indata buffer
- * @return - a full package (header,size,data,footer)
- *
- */
- public static byte[] createDataPackage(ChannelData cdata) {
-// return createDataPackage(cdata.getDataPackage());
- //avoid one extra byte array creation
- int dlength = cdata.getDataPackageLength();
- int length = getDataPackageLength(dlength);
- byte[] data = new byte[length];
- int offset = 0;
- System.arraycopy(START_DATA, 0, data, offset, START_DATA.length);
- offset += START_DATA.length;
- toBytes(dlength,data, START_DATA.length);
- offset += 4;
- cdata.getDataPackage(data,offset);
- offset += dlength;
- System.arraycopy(END_DATA, 0, data, offset, END_DATA.length);
- offset += END_DATA.length;
- return data;
- }
-
- public static byte[] createDataPackage(byte[] data, int doff, int dlength, byte[] buffer, int bufoff) {
- if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) {
- throw new ArrayIndexOutOfBoundsException("Unable to create data package, buffer is too small.");
- }
- System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length);
- toBytes(data.length,buffer, bufoff+START_DATA.length);
- System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, dlength);
- System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + data.length, END_DATA.length);
- return buffer;
- }
-
-
- public static int getDataPackageLength(int datalength) {
- int length =
- START_DATA.length + //header length
- 4 + //data length indicator
- datalength + //actual data length
- END_DATA.length; //footer length
- return length;
-
- }
-
- public static byte[] createDataPackage(byte[] data) {
- int length = getDataPackageLength(data.length);
- byte[] result = new byte[length];
- return createDataPackage(data,0,data.length,result,0);
- }
-
-
-
-// public static void fillDataPackage(byte[] data, int doff, int dlength, XByteBuffer buf) {
-// int pkglen = getDataPackageLength(dlength);
-// if ( buf.getCapacity() < pkglen ) buf.expand(pkglen);
-// createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength());
-// }
-
- /**
- * Convert four bytes to an int
- * @param b - the byte array containing the four bytes
- * @param off - the offset
- * @return the integer value constructed from the four bytes
- * @exception java.lang.ArrayIndexOutOfBoundsException
- */
- public static int toInt(byte[] b,int off){
- return ( ( (int) b[off+3]) & 0xFF) +
- ( ( ( (int) b[off+2]) & 0xFF) << 8) +
- ( ( ( (int) b[off+1]) & 0xFF) << 16) +
- ( ( ( (int) b[off+0]) & 0xFF) << 24);
- }
-
- /**
- * Convert eight bytes to a long
- * @param b - the byte array containing the four bytes
- * @param off - the offset
- * @return the long value constructed from the eight bytes
- * @exception java.lang.ArrayIndexOutOfBoundsException
- */
- public static long toLong(byte[] b,int off){
- return ( ( (long) b[off+7]) & 0xFF) +
- ( ( ( (long) b[off+6]) & 0xFF) << 8) +
- ( ( ( (long) b[off+5]) & 0xFF) << 16) +
- ( ( ( (long) b[off+4]) & 0xFF) << 24) +
- ( ( ( (long) b[off+3]) & 0xFF) << 32) +
- ( ( ( (long) b[off+2]) & 0xFF) << 40) +
- ( ( ( (long) b[off+1]) & 0xFF) << 48) +
- ( ( ( (long) b[off+0]) & 0xFF) << 56);
- }
-
-
- /**
- * Converts an integer to four bytes
- * @param n - the integer
- * @return - four bytes in an array
- * @deprecated use toBytes(boolean,byte[],int)
- */
- public static byte[] toBytes(boolean bool) {
- byte[] b = new byte[1] ;
- return toBytes(bool,b,0);
-
- }
-
- public static byte[] toBytes(boolean bool, byte[] data, int offset) {
- data[offset] = (byte)(bool?1:0);
- return data;
- }
-
- /**
- *
- * @param <any> long
- * @return use
- */
- public static boolean toBoolean(byte[] b, int offset) {
- return b[offset] != 0;
- }
-
-
- /**
- * Converts an integer to four bytes
- * @param n - the integer
- * @return - four bytes in an array
- * @deprecated use toBytes(int,byte[],int)
- */
- public static byte[] toBytes(int n) {
- return toBytes(n,new byte[4],0);
- }
-
- public static byte[] toBytes(int n,byte[] b, int offset) {
- b[offset+3] = (byte) (n);
- n >>>= 8;
- b[offset+2] = (byte) (n);
- n >>>= 8;
- b[offset+1] = (byte) (n);
- n >>>= 8;
- b[offset+0] = (byte) (n);
- return b;
- }
-
- /**
- * Converts an long to eight bytes
- * @param n - the long
- * @return - eight bytes in an array
- * @deprecated use toBytes(long,byte[],int)
- */
- public static byte[] toBytes(long n) {
- return toBytes(n,new byte[8],0);
- }
- public static byte[] toBytes(long n, byte[] b, int offset) {
- b[offset+7] = (byte) (n);
- n >>>= 8;
- b[offset+6] = (byte) (n);
- n >>>= 8;
- b[offset+5] = (byte) (n);
- n >>>= 8;
- b[offset+4] = (byte) (n);
- n >>>= 8;
- b[offset+3] = (byte) (n);
- n >>>= 8;
- b[offset+2] = (byte) (n);
- n >>>= 8;
- b[offset+1] = (byte) (n);
- n >>>= 8;
- b[offset+0] = (byte) (n);
- return b;
- }
-
- /**
- * Similar to a String.IndexOf, but uses pure bytes
- * @param src - the source bytes to be searched
- * @param srcOff - offset on the source buffer
- * @param find - the string to be found within src
- * @return - the index of the first matching byte. -1 if the find array is not found
- */
- public static int firstIndexOf(byte[] src, int srcOff, byte[] find){
- int result = -1;
- if (find.length > src.length) return result;
- if (find.length == 0 || src.length == 0) return result;
- if (srcOff >= src.length ) throw new java.lang.ArrayIndexOutOfBoundsException();
- boolean found = false;
- int srclen = src.length;
- int findlen = find.length;
- byte first = find[0];
- int pos = srcOff;
- while (!found) {
- //find the first byte
- while (pos < srclen){
- if (first == src[pos])
- break;
- pos++;
- }
- if (pos >= srclen)
- return -1;
-
- //we found the first character
- //match the rest of the bytes - they have to match
- if ( (srclen - pos) < findlen)
- return -1;
- //assume it does exist
- found = true;
- for (int i = 1; ( (i < findlen) && found); i++)
- found = found && (find[i] == src[pos + i]);
- if (found)
- result = pos;
- else if ( (srclen - pos) < findlen)
- return -1; //no more matches possible
- else
- pos++;
- }
- return result;
- }
-
-
- public static Serializable deserialize(byte[] data)
- throws IOException, ClassNotFoundException, ClassCastException {
- return deserialize(data,0,data.length);
- }
-
- public static Serializable deserialize(byte[] data, int offset, int length)
- throws IOException, ClassNotFoundException, ClassCastException {
- return deserialize(data,offset,length,null);
- }
- public static int invokecount = 0;
- public static Serializable deserialize(byte[] data, int offset, int length, ClassLoader[] cls)
- throws IOException, ClassNotFoundException, ClassCastException {
- synchronized (XByteBuffer.class) { invokecount++;}
- Object message = null;
- if ( cls == null ) cls = new ClassLoader[0];
- if (data != null) {
- InputStream instream = new ByteArrayInputStream(data,offset,length);
- ObjectInputStream stream = null;
- stream = (cls.length>0)? new ReplicationStream(instream,cls):new ObjectInputStream(instream);
- message = stream.readObject();
- instream.close();
- stream.close();
- }
- if ( message == null ) {
- return null;
- } else if (message instanceof Serializable)
- return (Serializable) message;
- else {
- throw new ClassCastException("Message has the wrong class. It should implement Serializable, instead it is:"+message.getClass().getName());
- }
- }
-
- /**
- * Serializes a message into cluster data
- * @param msg ClusterMessage
- * @param compress boolean
- * @return
- * @throws IOException
- */
- public static byte[] serialize(Serializable msg) throws IOException {
- ByteArrayOutputStream outs = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(outs);
- out.writeObject(msg);
- out.flush();
- byte[] data = outs.toByteArray();
- return data;
- }
-
- public void setDiscard(boolean discard) {
- this.discard = discard;
- }
-
- public boolean getDiscard() {
- return discard;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * The XByteBuffer provides a dual functionality.
+ * One, it stores message bytes and automatically extends the byte buffer if needed.<BR>
+ * Two, it can encode and decode packages so that they can be defined and identified
+ * as they come in on a socket.
+ * <br>
+ * <b>THIS CLASS IS NOT THREAD SAFE</B><BR>
+ * <br/>
+ * Transfer package:
+ * <ul>
+ * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>
+ * <li><b>SIZE</b> - 4 bytes - size of the data package</li>
+ * <li><b>DATA</b> - should be as many bytes as the prev SIZE</li>
+ * <li><b>END_DATA</b> - 7 bytes - <i>TLF2003</i></lI>
+ * </ul>
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+public class XByteBuffer
+{
+
+ public static org.apache.commons.logging.Log log =
+ org.apache.commons.logging.LogFactory.getLog( XByteBuffer.class );
+
+ /**
+ * This is a package header, 7 bytes (FLT2002)
+ */
+ public static final byte[] START_DATA = {70,76,84,50,48,48,50};
+
+ /**
+ * This is the package footer, 7 bytes (TLF2003)
+ */
+ public static final byte[] END_DATA = {84,76,70,50,48,48,51};
+
+ /**
+ * Default size on the initial byte buffer
+ */
+ private static final int DEF_SIZE = 2048;
+
+ /**
+ * Default size to extend the buffer with
+ */
+ private static final int DEF_EXT = 1024;
+
+ /**
+ * Variable to hold the data
+ */
+ protected byte[] buf = null;
+
+ /**
+ * Current length of data in the buffer
+ */
+ protected int bufSize = 0;
+
+ /**
+ * Flag for discarding invalid packages
+ * If this flag is set to true, and append(byte[],...) is called,
+ * the data added will be inspected, and if it doesn't start with
+ * <code>START_DATA</code> it will be thrown away.
+ *
+ */
+ protected boolean discard = true;
+
+ /**
+ * Constructs a new XByteBuffer
+ * @param size - the initial size of the byte buffer
+ * @todo use a pool of byte[] for performance
+ */
+ public XByteBuffer(int size, boolean discard) {
+ buf = new byte[size];
+ this.discard = discard;
+ }
+
+ public XByteBuffer(byte[] data,boolean discard) {
+ this(data,data.length+128,discard);
+ }
+
+ public XByteBuffer(byte[] data, int size,boolean discard) {
+ int length = Math.max(data.length,size);
+ buf = new byte[length];
+ System.arraycopy(data,0,buf,0,data.length);
+ bufSize = data.length;
+ this.discard = discard;
+ }
+
+ public int getLength() {
+ return bufSize;
+ }
+
+ public void setLength(int size) {
+ if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer.");
+ bufSize = size;
+ }
+
+ public void trim(int length) {
+ if ( (bufSize - length) < 0 )
+ throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length);
+ bufSize -= length;
+ }
+
+ public void reset() {
+ bufSize = 0;
+ }
+
+ public byte[] getBytesDirect() {
+ return this.buf;
+ }
+
+ /**
+ * Returns the bytes in the buffer, in its exact length
+ */
+ public byte[] getBytes() {
+ byte[] b = new byte[bufSize];
+ System.arraycopy(buf,0,b,0,bufSize);
+ return b;
+ }
+
+ /**
+ * Resets the buffer
+ */
+ public void clear() {
+ bufSize = 0;
+ }
+
+ /**
+ * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the
+ * header, false will be returned and the data will be discarded.
+ * @param b - bytes to be appended
+ * @param off - the offset to extract data from
+ * @param len - the number of bytes to append.
+ * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0
+ */
+ public boolean append(ByteBuffer b, int len) {
+ int newcount = bufSize + len;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ b.get(buf,bufSize,len);
+
+ bufSize = newcount;
+
+ if ( discard ) {
+ if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
+ bufSize = 0;
+ log.error("Discarded the package, invalid header");
+ return false;
+ }
+ }
+ return true;
+
+ }
+
+ public boolean append(byte i) {
+ int newcount = bufSize + 1;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ buf[bufSize] = i;
+ bufSize = newcount;
+ return true;
+ }
+
+
+ public boolean append(boolean i) {
+ int newcount = bufSize + 1;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ XByteBuffer.toBytes(i,buf,bufSize);
+ bufSize = newcount;
+ return true;
+ }
+
+ public boolean append(long i) {
+ int newcount = bufSize + 8;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ XByteBuffer.toBytes(i,buf,bufSize);
+ bufSize = newcount;
+ return true;
+ }
+
+ public boolean append(int i) {
+ int newcount = bufSize + 4;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ XByteBuffer.toBytes(i,buf,bufSize);
+ bufSize = newcount;
+ return true;
+ }
+
+ public boolean append(byte[] b, int off, int len) {
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return false;
+ }
+
+ int newcount = bufSize + len;
+ if (newcount > buf.length) {
+ expand(newcount);
+ }
+ System.arraycopy(b, off, buf, bufSize, len);
+ bufSize = newcount;
+
+ if ( discard ) {
+ if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
+ bufSize = 0;
+ log.error("Discarded the package, invalid header");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void expand(int newcount) {
+ //don't change the allocation strategy
+ byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, bufSize);
+ buf = newbuf;
+ }
+
+ public int getCapacity() {
+ return buf.length;
+ }
+
+
+ /**
+ * Internal mechanism to make a check if a complete package exists
+ * within the buffer
+ * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer
+ */
+ public int countPackages() {
+ return countPackages(false);
+ }
+
+ public int countPackages(boolean first)
+ {
+ int cnt = 0;
+ int pos = START_DATA.length;
+ int start = 0;
+
+ while ( start < bufSize ) {
+ //first check start header
+ int index = XByteBuffer.firstIndexOf(buf,start,START_DATA);
+ //if the header (START_DATA) isn't the first thing or
+ //the buffer isn't even 14 bytes
+ if ( index != start || ((bufSize-start)<14) ) break;
+ //next 4 bytes are compress flag not needed for count packages
+ //then get the size 4 bytes
+ int size = toInt(buf, pos);
+ //now the total buffer has to be long enough to hold
+ //START_DATA.length+4+size+END_DATA.length
+ pos = start + START_DATA.length + 4 + size;
+ if ( (pos + END_DATA.length) > bufSize) break;
+ //and finally check the footer of the package END_DATA
+ int newpos = firstIndexOf(buf, pos, END_DATA);
+ //mismatch, there is no package
+ if (newpos != pos) break;
+ //increase the packet count
+ cnt++;
+ //reset the values
+ start = pos + END_DATA.length;
+ pos = start + START_DATA.length;
+ //we only want to verify that we have at least one package
+ if ( first ) break;
+ }
+ return cnt;
+ }
+
+ /**
+ * Method to check if a package exists in this byte buffer.
+ * @return - true if a complete package (header,options,size,data,footer) exists within the buffer
+ */
+ public boolean doesPackageExist() {
+ return (countPackages(true)>0);
+ }
+
+ /**
+ * Extracts the message bytes from a package.
+ * If no package exists, a IllegalStateException will be thrown.
+ * @param clearFromBuffer - if true, the package will be removed from the byte buffer
+ * @return - returns the actual message bytes (header, compress,size and footer not included).
+ */
+ public XByteBuffer extractDataPackage(boolean clearFromBuffer) {
+ int psize = countPackages(true);
+ if (psize == 0) {
+ throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
+ }
+ int size = toInt(buf, START_DATA.length);
+ XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);
+ xbuf.setLength(size);
+ System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);
+ if (clearFromBuffer) {
+ int totalsize = START_DATA.length + 4 + size + END_DATA.length;
+ bufSize = bufSize - totalsize;
+ System.arraycopy(buf, totalsize, buf, 0, bufSize);
+ }
+ return xbuf;
+
+ }
+
+ public ChannelData extractPackage(boolean clearFromBuffer) throws java.io.IOException {
+ XByteBuffer xbuf = extractDataPackage(clearFromBuffer);
+ ChannelData cdata = ChannelData.getDataFromPackage(xbuf);
+ return cdata;
+ }
+
+ /**
+ * Creates a complete data package
+ * @param indata - the message data to be contained within the package
+ * @param compressed - compression flag for the indata buffer
+ * @return - a full package (header,size,data,footer)
+ *
+ */
+ public static byte[] createDataPackage(ChannelData cdata) {
+// return createDataPackage(cdata.getDataPackage());
+ //avoid one extra byte array creation
+ int dlength = cdata.getDataPackageLength();
+ int length = getDataPackageLength(dlength);
+ byte[] data = new byte[length];
+ int offset = 0;
+ System.arraycopy(START_DATA, 0, data, offset, START_DATA.length);
+ offset += START_DATA.length;
+ toBytes(dlength,data, START_DATA.length);
+ offset += 4;
+ cdata.getDataPackage(data,offset);
+ offset += dlength;
+ System.arraycopy(END_DATA, 0, data, offset, END_DATA.length);
+ offset += END_DATA.length;
+ return data;
+ }
+
+ public static byte[] createDataPackage(byte[] data, int doff, int dlength, byte[] buffer, int bufoff) {
+ if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) {
+ throw new ArrayIndexOutOfBoundsException("Unable to create data package, buffer is too small.");
+ }
+ System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length);
+ toBytes(data.length,buffer, bufoff+START_DATA.length);
+ System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, dlength);
+ System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + data.length, END_DATA.length);
+ return buffer;
+ }
+
+
+ public static int getDataPackageLength(int datalength) {
+ int length =
+ START_DATA.length + //header length
+ 4 + //data length indicator
+ datalength + //actual data length
+ END_DATA.length; //footer length
+ return length;
+
+ }
+
+ public static byte[] createDataPackage(byte[] data) {
+ int length = getDataPackageLength(data.length);
+ byte[] result = new byte[length];
+ return createDataPackage(data,0,data.length,result,0);
+ }
+
+
+
+// public static void fillDataPackage(byte[] data, int doff, int dlength, XByteBuffer buf) {
+// int pkglen = getDataPackageLength(dlength);
+// if ( buf.getCapacity() < pkglen ) buf.expand(pkglen);
+// createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength());
+// }
+
+ /**
+ * Convert four bytes to an int
+ * @param b - the byte array containing the four bytes
+ * @param off - the offset
+ * @return the integer value constructed from the four bytes
+ * @exception java.lang.ArrayIndexOutOfBoundsException
+ */
+ public static int toInt(byte[] b,int off){
+ return ( ( (int) b[off+3]) & 0xFF) +
+ ( ( ( (int) b[off+2]) & 0xFF) << 8) +
+ ( ( ( (int) b[off+1]) & 0xFF) << 16) +
+ ( ( ( (int) b[off+0]) & 0xFF) << 24);
+ }
+
+ /**
+ * Convert eight bytes to a long
+ * @param b - the byte array containing the four bytes
+ * @param off - the offset
+ * @return the long value constructed from the eight bytes
+ * @exception java.lang.ArrayIndexOutOfBoundsException
+ */
+ public static long toLong(byte[] b,int off){
+ return ( ( (long) b[off+7]) & 0xFF) +
+ ( ( ( (long) b[off+6]) & 0xFF) << 8) +
+ ( ( ( (long) b[off+5]) & 0xFF) << 16) +
+ ( ( ( (long) b[off+4]) & 0xFF) << 24) +
+ ( ( ( (long) b[off+3]) & 0xFF) << 32) +
+ ( ( ( (long) b[off+2]) & 0xFF) << 40) +
+ ( ( ( (long) b[off+1]) & 0xFF) << 48) +
+ ( ( ( (long) b[off+0]) & 0xFF) << 56);
+ }
+
+
+ /**
+ * Converts an integer to four bytes
+ * @param n - the integer
+ * @return - four bytes in an array
+ * @deprecated use toBytes(boolean,byte[],int)
+ */
+ public static byte[] toBytes(boolean bool) {
+ byte[] b = new byte[1] ;
+ return toBytes(bool,b,0);
+
+ }
+
+ public static byte[] toBytes(boolean bool, byte[] data, int offset) {
+ data[offset] = (byte)(bool?1:0);
+ return data;
+ }
+
+ /**
+ *
+ * @param <any> long
+ * @return use
+ */
+ public static boolean toBoolean(byte[] b, int offset) {
+ return b[offset] != 0;
+ }
+
+
+ /**
+ * Converts an integer to four bytes
+ * @param n - the integer
+ * @return - four bytes in an array
+ * @deprecated use toBytes(int,byte[],int)
+ */
+ public static byte[] toBytes(int n) {
+ return toBytes(n,new byte[4],0);
+ }
+
+ public static byte[] toBytes(int n,byte[] b, int offset) {
+ b[offset+3] = (byte) (n);
+ n >>>= 8;
+ b[offset+2] = (byte) (n);
+ n >>>= 8;
+ b[offset+1] = (byte) (n);
+ n >>>= 8;
+ b[offset+0] = (byte) (n);
+ return b;
+ }
+
+ /**
+ * Converts an long to eight bytes
+ * @param n - the long
+ * @return - eight bytes in an array
+ * @deprecated use toBytes(long,byte[],int)
+ */
+ public static byte[] toBytes(long n) {
+ return toBytes(n,new byte[8],0);
+ }
+ public static byte[] toBytes(long n, byte[] b, int offset) {
+ b[offset+7] = (byte) (n);
+ n >>>= 8;
+ b[offset+6] = (byte) (n);
+ n >>>= 8;
+ b[offset+5] = (byte) (n);
+ n >>>= 8;
+ b[offset+4] = (byte) (n);
+ n >>>= 8;
+ b[offset+3] = (byte) (n);
+ n >>>= 8;
+ b[offset+2] = (byte) (n);
+ n >>>= 8;
+ b[offset+1] = (byte) (n);
+ n >>>= 8;
+ b[offset+0] = (byte) (n);
+ return b;
+ }
+
+ /**
+ * Similar to a String.IndexOf, but uses pure bytes
+ * @param src - the source bytes to be searched
+ * @param srcOff - offset on the source buffer
+ * @param find - the string to be found within src
+ * @return - the index of the first matching byte. -1 if the find array is not found
+ */
+ public static int firstIndexOf(byte[] src, int srcOff, byte[] find){
+ int result = -1;
+ if (find.length > src.length) return result;
+ if (find.length == 0 || src.length == 0) return result;
+ if (srcOff >= src.length ) throw new java.lang.ArrayIndexOutOfBoundsException();
+ boolean found = false;
+ int srclen = src.length;
+ int findlen = find.length;
+ byte first = find[0];
+ int pos = srcOff;
+ while (!found) {
+ //find the first byte
+ while (pos < srclen){
+ if (first == src[pos])
+ break;
+ pos++;
+ }
+ if (pos >= srclen)
+ return -1;
+
+ //we found the first character
+ //match the rest of the bytes - they have to match
+ if ( (srclen - pos) < findlen)
+ return -1;
+ //assume it does exist
+ found = true;
+ for (int i = 1; ( (i < findlen) && found); i++)
+ found = found && (find[i] == src[pos + i]);
+ if (found)
+ result = pos;
+ else if ( (srclen - pos) < findlen)
+ return -1; //no more matches possible
+ else
+ pos++;
+ }
+ return result;
+ }
+
+
+ public static Serializable deserialize(byte[] data)
+ throws IOException, ClassNotFoundException, ClassCastException {
+ return deserialize(data,0,data.length);
+ }
+
+ public static Serializable deserialize(byte[] data, int offset, int length)
+ throws IOException, ClassNotFoundException, ClassCastException {
+ return deserialize(data,offset,length,null);
+ }
+ public static int invokecount = 0;
+ public static Serializable deserialize(byte[] data, int offset, int length, ClassLoader[] cls)
+ throws IOException, ClassNotFoundException, ClassCastException {
+ synchronized (XByteBuffer.class) { invokecount++;}
+ Object message = null;
+ if ( cls == null ) cls = new ClassLoader[0];
+ if (data != null) {
+ InputStream instream = new ByteArrayInputStream(data,offset,length);
+ ObjectInputStream stream = null;
+ stream = (cls.length>0)? new ReplicationStream(instream,cls):new ObjectInputStream(instream);
+ message = stream.readObject();
+ instream.close();
+ stream.close();
+ }
+ if ( message == null ) {
+ return null;
+ } else if (message instanceof Serializable)
+ return (Serializable) message;
+ else {
+ throw new ClassCastException("Message has the wrong class. It should implement Serializable, instead it is:"+message.getClass().getName());
+ }
+ }
+
+ /**
+ * Serializes a message into cluster data
+ * @param msg ClusterMessage
+ * @param compress boolean
+ * @return
+ * @throws IOException
+ */
+ public static byte[] serialize(Serializable msg) throws IOException {
+ ByteArrayOutputStream outs = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(outs);
+ out.writeObject(msg);
+ out.flush();
+ byte[] data = outs.toByteArray();
+ return data;
+ }
+
+ public void setDiscard(boolean discard) {
+ this.discard = discard;
+ }
+
+ public boolean getDiscard() {
+ return discard;
+ }
+
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mbeans-descriptors.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mbeans-descriptors.xml
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java Mon Oct 23 19:45:46 2006
@@ -1,40 +1,40 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.catalina.tribes.membership;
-
-import org.apache.catalina.tribes.util.Arrays;
-
-
-/**
- * Manifest constants for the <code>org.apache.catalina.tribes.membership</code>
- * package.
- *
- * @author Peter Rossbach
- * @version $Revision: 303950 $ $Date: 2005-06-09 15:38:30 -0500 (Thu, 09 Jun 2005) $
- * @author Filip Hanik
- */
-
-public class Constants {
-
- public static final String Package = "org.apache.catalina.tribes.membership";
- public static void main(String[] args) throws Exception {
- System.out.println(Arrays.toString("TRIBES-B".getBytes()));
- System.out.println(Arrays.toString("TRIBES-E".getBytes()));
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.tribes.membership;
+
+import org.apache.catalina.tribes.util.Arrays;
+
+
+/**
+ * Manifest constants for the <code>org.apache.catalina.tribes.membership</code>
+ * package.
+ *
+ * @author Peter Rossbach
+ * @version $Revision$ $Date$
+ * @author Filip Hanik
+ */
+
+public class Constants {
+
+ public static final String Package = "org.apache.catalina.tribes.membership";
+ public static void main(String[] args) throws Exception {
+ System.out.println(Arrays.toString("TRIBES-B".getBytes()));
+ System.out.println(Arrays.toString("TRIBES-E".getBytes()));
+ }
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/LocalStrings.properties
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/LocalStrings.properties
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java?view=diff&rev=467206&r1=467205&r2=467206
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java Mon Oct 23 19:45:46 2006
@@ -1,449 +1,449 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.membership;
-
-import java.util.Properties;
-
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.MembershipService;
-import org.apache.catalina.tribes.util.StringManager;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-import java.io.IOException;
-
-/**
- * A <b>membership</b> implementation using simple multicast.
- * This is the representation of a multicast membership service.
- * This class is responsible for maintaining a list of active cluster nodes in the cluster.
- * If a node fails to send out a heartbeat, the node will be dismissed.
- *
- * @author Filip Hanik
- * @version $Revision: 378093 $, $Date: 2006-02-15 15:13:45 -0600 (Wed, 15 Feb 2006) $
- */
-
-
-public class McastService implements MembershipService,MembershipListener {
-
- private static org.apache.commons.logging.Log log =
- org.apache.commons.logging.LogFactory.getLog( McastService.class );
-
- /**
- * The string manager for this package.
- */
- protected StringManager sm = StringManager.getManager(Constants.Package);
-
- /**
- * The descriptive information about this implementation.
- */
- private static final String info = "McastService/2.1";
-
- /**
- * The implementation specific properties
- */
- protected Properties properties = new Properties();
- /**
- * A handle to the actual low level implementation
- */
- protected McastServiceImpl impl;
- /**
- * A membership listener delegate (should be the cluster :)
- */
- protected MembershipListener listener;
- /**
- * The local member
- */
- protected MemberImpl localMember ;
- private int mcastSoTimeout;
- private int mcastTTL;
-
- protected byte[] payload;
-
- protected byte[] domain;
-
- /**
- * Create a membership service.
- */
- public McastService() {
- //default values
- properties.setProperty("mcastPort","45564");
- properties.setProperty("mcastAddress","228.0.0.4");
- properties.setProperty("memberDropTime","3000");
- properties.setProperty("mcastFrequency","500");
-
- }
-
- /**
- * Return descriptive information about this implementation and the
- * corresponding version number, in the format
- * <code><description>/<version></code>.
- */
- public String getInfo() {
- return (info);
- }
-
- /**
- *
- * @param properties
- * <BR/>All are required<BR />
- * 1. mcastPort - the port to listen to<BR>
- * 2. mcastAddress - the mcast group address<BR>
- * 4. bindAddress - the bind address if any - only one that can be null<BR>
- * 5. memberDropTime - the time a member is gone before it is considered gone.<BR>
- * 6. mcastFrequency - the frequency of sending messages<BR>
- * 7. tcpListenPort - the port this member listens to<BR>
- * 8. tcpListenHost - the bind address of this member<BR>
- * @exception java.lang.IllegalArgumentException if a property is missing.
- */
- public void setProperties(Properties properties) {
- hasProperty(properties,"mcastPort");
- hasProperty(properties,"mcastAddress");
- hasProperty(properties,"memberDropTime");
- hasProperty(properties,"mcastFrequency");
- hasProperty(properties,"tcpListenPort");
- hasProperty(properties,"tcpListenHost");
- this.properties = properties;
- }
-
- /**
- * Return the properties, see setProperties
- */
- public Properties getProperties() {
- return properties;
- }
-
- /**
- * Return the local member name
- */
- public String getLocalMemberName() {
- return localMember.toString() ;
- }
-
- /**
- * Return the local member
- */
- public Member getLocalMember(boolean alive) {
- if ( alive && localMember != null && impl != null) localMember.setMemberAliveTime(System.currentTimeMillis()-impl.getServiceStartTime());
- return localMember;
- }
-
- /**
- * Sets the local member properties for broadcasting
- */
- public void setLocalMemberProperties(String listenHost, int listenPort) {
- properties.setProperty("tcpListenHost",listenHost);
- properties.setProperty("tcpListenPort",String.valueOf(listenPort));
- try {
- if (localMember != null) {
- localMember.setHostname(listenHost);
- localMember.setPort(listenPort);
- } else {
- localMember = new MemberImpl(listenHost, listenPort, 0);
- localMember.setUniqueId(UUIDGenerator.randomUUID(true));
- localMember.setPayload(getPayload());
- localMember.setDomain(getDomain());
- }
- localMember.getData(true, true);
- }catch ( IOException x ) {
- throw new IllegalArgumentException(x);
- }
- }
-
- public void setMcastAddr(String addr) {
- properties.setProperty("mcastAddress", addr);
- }
-
- public String getMcastAddr() {
- return properties.getProperty("mcastAddress");
- }
-
- public void setMcastBindAddress(String bindaddr) {
- properties.setProperty("mcastBindAddress", bindaddr);
- }
-
- public String getMcastBindAddress() {
- return properties.getProperty("mcastBindAddress");
- }
-
- public void setMcastPort(int port) {
- properties.setProperty("mcastPort", String.valueOf(port));
- }
-
- public int getMcastPort() {
- String p = properties.getProperty("mcastPort");
- return new Integer(p).intValue();
- }
-
- public void setMcastFrequency(long time) {
- properties.setProperty("mcastFrequency", String.valueOf(time));
- }
-
- public long getMcastFrequency() {
- String p = properties.getProperty("mcastFrequency");
- return new Long(p).longValue();
- }
-
- public void setMcastDropTime(long time) {
- properties.setProperty("memberDropTime", String.valueOf(time));
- }
-
- public long getMcastDropTime() {
- String p = properties.getProperty("memberDropTime");
- return new Long(p).longValue();
- }
-
- /**
- * Check if a required property is available.
- * @param properties The set of properties
- * @param name The property to check for
- */
- protected void hasProperty(Properties properties, String name){
- if ( properties.getProperty(name)==null) throw new IllegalArgumentException("McastService:Required property \""+name+"\" is missing.");
- }
-
- /**
- * Start broadcasting and listening to membership pings
- * @throws java.lang.Exception if a IO error occurs
- */
- public void start() throws java.lang.Exception {
- start(MembershipService.MBR_RX);
- start(MembershipService.MBR_TX);
- }
-
- public void start(int level) throws java.lang.Exception {
- hasProperty(properties,"mcastPort");
- hasProperty(properties,"mcastAddress");
- hasProperty(properties,"memberDropTime");
- hasProperty(properties,"mcastFrequency");
- hasProperty(properties,"tcpListenPort");
- hasProperty(properties,"tcpListenHost");
-
- if ( impl != null ) {
- impl.start(level);
- return;
- }
- String host = getProperties().getProperty("tcpListenHost");
- int port = Integer.parseInt(getProperties().getProperty("tcpListenPort"));
-
- if ( localMember == null ) {
- localMember = new MemberImpl(host, port, 100);
- localMember.setUniqueId(UUIDGenerator.randomUUID(true));
- } else {
- localMember.setHostname(host);
- localMember.setPort(port);
- localMember.setMemberAliveTime(100);
- }
- if ( this.payload != null ) localMember.setPayload(payload);
- if ( this.domain != null ) localMember.setDomain(domain);
- localMember.setServiceStartTime(System.currentTimeMillis());
- java.net.InetAddress bind = null;
- if ( properties.getProperty("mcastBindAddress")!= null ) {
- bind = java.net.InetAddress.getByName(properties.getProperty("mcastBindAddress"));
- }
- int ttl = -1;
- int soTimeout = -1;
- if ( properties.getProperty("mcastTTL") != null ) {
- try {
- ttl = Integer.parseInt(properties.getProperty("mcastTTL"));
- } catch ( Exception x ) {
- log.error("Unable to parse mcastTTL="+properties.getProperty("mcastTTL"),x);
- }
- }
- if ( properties.getProperty("mcastSoTimeout") != null ) {
- try {
- soTimeout = Integer.parseInt(properties.getProperty("mcastSoTimeout"));
- } catch ( Exception x ) {
- log.error("Unable to parse mcastSoTimeout="+properties.getProperty("mcastSoTimeout"),x);
- }
- }
-
- impl = new McastServiceImpl((MemberImpl)localMember,Long.parseLong(properties.getProperty("mcastFrequency")),
- Long.parseLong(properties.getProperty("memberDropTime")),
- Integer.parseInt(properties.getProperty("mcastPort")),
- bind,
- java.net.InetAddress.getByName(properties.getProperty("mcastAddress")),
- ttl,
- soTimeout,
- this);
-
- impl.start(level);
-
-
- }
-
-
- /**
- * Stop broadcasting and listening to membership pings
- */
- public void stop(int svc) {
- try {
- if ( impl != null && impl.stop(svc) ) impl = null;
- } catch ( Exception x) {
- log.error("Unable to stop the mcast service, level:"+svc+".",x);
- }
- }
-
-
- /**
- * Return all the members by name
- */
- public String[] getMembersByName() {
- Member[] currentMembers = getMembers();
- String [] membernames ;
- if(currentMembers != null) {
- membernames = new String[currentMembers.length];
- for (int i = 0; i < currentMembers.length; i++) {
- membernames[i] = currentMembers[i].toString() ;
- }
- } else
- membernames = new String[0] ;
- return membernames ;
- }
-
- /**
- * Return the member by name
- */
- public Member findMemberByName(String name) {
- Member[] currentMembers = getMembers();
- for (int i = 0; i < currentMembers.length; i++) {
- if (name.equals(currentMembers[i].toString()))
- return currentMembers[i];
- }
- return null;
- }
-
- /**
- * has members?
- */
- public boolean hasMembers() {
- if ( impl == null || impl.membership == null ) return false;
- return impl.membership.hasMembers();
- }
-
- public Member getMember(Member mbr) {
- if ( impl == null || impl.membership == null ) return null;
- return impl.membership.getMember(mbr);
- }
-
- /**
- * Return all the members
- */
- public Member[] getMembers() {
- if ( impl == null || impl.membership == null ) return null;
- return impl.membership.getMembers();
- }
- /**
- * Add a membership listener, this version only supports one listener per service,
- * so calling this method twice will result in only the second listener being active.
- * @param listener The listener
- */
- public void setMembershipListener(MembershipListener listener) {
- this.listener = listener;
- }
- /**
- * Remove the membership listener
- */
- public void removeMembershipListener(){
- listener = null;
- }
-
- public void memberAdded(Member member) {
- if ( listener!=null ) listener.memberAdded(member);
- }
-
- /**
- * Callback from the impl when a new member has been received
- * @param member The member
- */
- public void memberDisappeared(Member member)
- {
- if ( listener!=null ) listener.memberDisappeared(member);
- }
-
- public int getMcastSoTimeout() {
- return mcastSoTimeout;
- }
- public void setMcastSoTimeout(int mcastSoTimeout) {
- this.mcastSoTimeout = mcastSoTimeout;
- properties.setProperty("mcastSoTimeout", String.valueOf(mcastSoTimeout));
- }
- public int getMcastTTL() {
- return mcastTTL;
- }
-
- public byte[] getPayload() {
- return payload;
- }
-
- public byte[] getDomain() {
- return domain;
- }
-
- public void setMcastTTL(int mcastTTL) {
- this.mcastTTL = mcastTTL;
- properties.setProperty("mcastTTL", String.valueOf(mcastTTL));
- }
-
- public void setPayload(byte[] payload) {
- this.payload = payload;
- if ( localMember != null ) {
- localMember.setPayload(payload);
- localMember.getData(true,true);
- try {
- if (impl != null) impl.send(false);
- }catch ( Exception x ) {
- log.error("Unable to send payload update.",x);
- }
- }
- }
-
- public void setDomain(byte[] domain) {
- this.domain = domain;
- if ( localMember != null ) {
- localMember.setDomain(domain);
- localMember.getData(true,true);
- try {
- if (impl != null) impl.send(false);
- }catch ( Exception x ) {
- log.error("Unable to send domain update.",x);
- }
- }
- }
-
- /**
- * Simple test program
- * @param args Command-line arguments
- * @throws Exception If an error occurs
- */
- public static void main(String args[]) throws Exception {
- if(log.isInfoEnabled())
- log.info("Usage McastService hostname tcpport");
- McastService service = new McastService();
- java.util.Properties p = new java.util.Properties();
- p.setProperty("mcastPort","5555");
- p.setProperty("mcastAddress","224.10.10.10");
- p.setProperty("mcastClusterDomain","catalina");
- p.setProperty("bindAddress","localhost");
- p.setProperty("memberDropTime","3000");
- p.setProperty("mcastFrequency","500");
- p.setProperty("tcpListenPort","4000");
- p.setProperty("tcpListenHost","127.0.0.1");
- service.setProperties(p);
- service.start();
- Thread.sleep(60*1000*60);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.membership;
+
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.util.StringManager;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import java.io.IOException;
+
+/**
+ * A <b>membership</b> implementation using simple multicast.
+ * This is the representation of a multicast membership service.
+ * This class is responsible for maintaining a list of active cluster nodes in the cluster.
+ * If a node fails to send out a heartbeat, the node will be dismissed.
+ *
+ * @author Filip Hanik
+ * @version $Revision$, $Date$
+ */
+
+
+public class McastService implements MembershipService,MembershipListener {
+
+ private static org.apache.commons.logging.Log log =
+ org.apache.commons.logging.LogFactory.getLog( McastService.class );
+
+ /**
+ * The string manager for this package.
+ */
+ protected StringManager sm = StringManager.getManager(Constants.Package);
+
+ /**
+ * The descriptive information about this implementation.
+ */
+ private static final String info = "McastService/2.1";
+
+ /**
+ * The implementation specific properties
+ */
+ protected Properties properties = new Properties();
+ /**
+ * A handle to the actual low level implementation
+ */
+ protected McastServiceImpl impl;
+ /**
+ * A membership listener delegate (should be the cluster :)
+ */
+ protected MembershipListener listener;
+ /**
+ * The local member
+ */
+ protected MemberImpl localMember ;
+ private int mcastSoTimeout;
+ private int mcastTTL;
+
+ protected byte[] payload;
+
+ protected byte[] domain;
+
+ /**
+ * Create a membership service.
+ */
+ public McastService() {
+ //default values
+ properties.setProperty("mcastPort","45564");
+ properties.setProperty("mcastAddress","228.0.0.4");
+ properties.setProperty("memberDropTime","3000");
+ properties.setProperty("mcastFrequency","500");
+
+ }
+
+ /**
+ * Return descriptive information about this implementation and the
+ * corresponding version number, in the format
+ * <code><description>/<version></code>.
+ */
+ public String getInfo() {
+ return (info);
+ }
+
+ /**
+ *
+ * @param properties
+ * <BR/>All are required<BR />
+ * 1. mcastPort - the port to listen to<BR>
+ * 2. mcastAddress - the mcast group address<BR>
+ * 4. bindAddress - the bind address if any - only one that can be null<BR>
+ * 5. memberDropTime - the time a member is gone before it is considered gone.<BR>
+ * 6. mcastFrequency - the frequency of sending messages<BR>
+ * 7. tcpListenPort - the port this member listens to<BR>
+ * 8. tcpListenHost - the bind address of this member<BR>
+ * @exception java.lang.IllegalArgumentException if a property is missing.
+ */
+ public void setProperties(Properties properties) {
+ hasProperty(properties,"mcastPort");
+ hasProperty(properties,"mcastAddress");
+ hasProperty(properties,"memberDropTime");
+ hasProperty(properties,"mcastFrequency");
+ hasProperty(properties,"tcpListenPort");
+ hasProperty(properties,"tcpListenHost");
+ this.properties = properties;
+ }
+
+ /**
+ * Return the properties, see setProperties
+ */
+ public Properties getProperties() {
+ return properties;
+ }
+
+ /**
+ * Return the local member name
+ */
+ public String getLocalMemberName() {
+ return localMember.toString() ;
+ }
+
+ /**
+ * Return the local member
+ */
+ public Member getLocalMember(boolean alive) {
+ if ( alive && localMember != null && impl != null) localMember.setMemberAliveTime(System.currentTimeMillis()-impl.getServiceStartTime());
+ return localMember;
+ }
+
+ /**
+ * Sets the local member properties for broadcasting
+ */
+ public void setLocalMemberProperties(String listenHost, int listenPort) {
+ properties.setProperty("tcpListenHost",listenHost);
+ properties.setProperty("tcpListenPort",String.valueOf(listenPort));
+ try {
+ if (localMember != null) {
+ localMember.setHostname(listenHost);
+ localMember.setPort(listenPort);
+ } else {
+ localMember = new MemberImpl(listenHost, listenPort, 0);
+ localMember.setUniqueId(UUIDGenerator.randomUUID(true));
+ localMember.setPayload(getPayload());
+ localMember.setDomain(getDomain());
+ }
+ localMember.getData(true, true);
+ }catch ( IOException x ) {
+ throw new IllegalArgumentException(x);
+ }
+ }
+
+ public void setMcastAddr(String addr) {
+ properties.setProperty("mcastAddress", addr);
+ }
+
+ public String getMcastAddr() {
+ return properties.getProperty("mcastAddress");
+ }
+
+ public void setMcastBindAddress(String bindaddr) {
+ properties.setProperty("mcastBindAddress", bindaddr);
+ }
+
+ public String getMcastBindAddress() {
+ return properties.getProperty("mcastBindAddress");
+ }
+
+ public void setMcastPort(int port) {
+ properties.setProperty("mcastPort", String.valueOf(port));
+ }
+
+ public int getMcastPort() {
+ String p = properties.getProperty("mcastPort");
+ return new Integer(p).intValue();
+ }
+
+ public void setMcastFrequency(long time) {
+ properties.setProperty("mcastFrequency", String.valueOf(time));
+ }
+
+ public long getMcastFrequency() {
+ String p = properties.getProperty("mcastFrequency");
+ return new Long(p).longValue();
+ }
+
+ public void setMcastDropTime(long time) {
+ properties.setProperty("memberDropTime", String.valueOf(time));
+ }
+
+ public long getMcastDropTime() {
+ String p = properties.getProperty("memberDropTime");
+ return new Long(p).longValue();
+ }
+
+ /**
+ * Check if a required property is available.
+ * @param properties The set of properties
+ * @param name The property to check for
+ */
+ protected void hasProperty(Properties properties, String name){
+ if ( properties.getProperty(name)==null) throw new IllegalArgumentException("McastService:Required property \""+name+"\" is missing.");
+ }
+
+ /**
+ * Start broadcasting and listening to membership pings
+ * @throws java.lang.Exception if a IO error occurs
+ */
+ public void start() throws java.lang.Exception {
+ start(MembershipService.MBR_RX);
+ start(MembershipService.MBR_TX);
+ }
+
+ public void start(int level) throws java.lang.Exception {
+ hasProperty(properties,"mcastPort");
+ hasProperty(properties,"mcastAddress");
+ hasProperty(properties,"memberDropTime");
+ hasProperty(properties,"mcastFrequency");
+ hasProperty(properties,"tcpListenPort");
+ hasProperty(properties,"tcpListenHost");
+
+ if ( impl != null ) {
+ impl.start(level);
+ return;
+ }
+ String host = getProperties().getProperty("tcpListenHost");
+ int port = Integer.parseInt(getProperties().getProperty("tcpListenPort"));
+
+ if ( localMember == null ) {
+ localMember = new MemberImpl(host, port, 100);
+ localMember.setUniqueId(UUIDGenerator.randomUUID(true));
+ } else {
+ localMember.setHostname(host);
+ localMember.setPort(port);
+ localMember.setMemberAliveTime(100);
+ }
+ if ( this.payload != null ) localMember.setPayload(payload);
+ if ( this.domain != null ) localMember.setDomain(domain);
+ localMember.setServiceStartTime(System.currentTimeMillis());
+ java.net.InetAddress bind = null;
+ if ( properties.getProperty("mcastBindAddress")!= null ) {
+ bind = java.net.InetAddress.getByName(properties.getProperty("mcastBindAddress"));
+ }
+ int ttl = -1;
+ int soTimeout = -1;
+ if ( properties.getProperty("mcastTTL") != null ) {
+ try {
+ ttl = Integer.parseInt(properties.getProperty("mcastTTL"));
+ } catch ( Exception x ) {
+ log.error("Unable to parse mcastTTL="+properties.getProperty("mcastTTL"),x);
+ }
+ }
+ if ( properties.getProperty("mcastSoTimeout") != null ) {
+ try {
+ soTimeout = Integer.parseInt(properties.getProperty("mcastSoTimeout"));
+ } catch ( Exception x ) {
+ log.error("Unable to parse mcastSoTimeout="+properties.getProperty("mcastSoTimeout"),x);
+ }
+ }
+
+ impl = new McastServiceImpl((MemberImpl)localMember,Long.parseLong(properties.getProperty("mcastFrequency")),
+ Long.parseLong(properties.getProperty("memberDropTime")),
+ Integer.parseInt(properties.getProperty("mcastPort")),
+ bind,
+ java.net.InetAddress.getByName(properties.getProperty("mcastAddress")),
+ ttl,
+ soTimeout,
+ this);
+
+ impl.start(level);
+
+
+ }
+
+
+ /**
+ * Stop broadcasting and listening to membership pings
+ */
+ public void stop(int svc) {
+ try {
+ if ( impl != null && impl.stop(svc) ) impl = null;
+ } catch ( Exception x) {
+ log.error("Unable to stop the mcast service, level:"+svc+".",x);
+ }
+ }
+
+
+ /**
+ * Return all the members by name
+ */
+ public String[] getMembersByName() {
+ Member[] currentMembers = getMembers();
+ String [] membernames ;
+ if(currentMembers != null) {
+ membernames = new String[currentMembers.length];
+ for (int i = 0; i < currentMembers.length; i++) {
+ membernames[i] = currentMembers[i].toString() ;
+ }
+ } else
+ membernames = new String[0] ;
+ return membernames ;
+ }
+
+ /**
+ * Return the member by name
+ */
+ public Member findMemberByName(String name) {
+ Member[] currentMembers = getMembers();
+ for (int i = 0; i < currentMembers.length; i++) {
+ if (name.equals(currentMembers[i].toString()))
+ return currentMembers[i];
+ }
+ return null;
+ }
+
+ /**
+ * has members?
+ */
+ public boolean hasMembers() {
+ if ( impl == null || impl.membership == null ) return false;
+ return impl.membership.hasMembers();
+ }
+
+ public Member getMember(Member mbr) {
+ if ( impl == null || impl.membership == null ) return null;
+ return impl.membership.getMember(mbr);
+ }
+
+ /**
+ * Return all the members
+ */
+ public Member[] getMembers() {
+ if ( impl == null || impl.membership == null ) return null;
+ return impl.membership.getMembers();
+ }
+ /**
+ * Add a membership listener, this version only supports one listener per service,
+ * so calling this method twice will result in only the second listener being active.
+ * @param listener The listener
+ */
+ public void setMembershipListener(MembershipListener listener) {
+ this.listener = listener;
+ }
+ /**
+ * Remove the membership listener
+ */
+ public void removeMembershipListener(){
+ listener = null;
+ }
+
+ public void memberAdded(Member member) {
+ if ( listener!=null ) listener.memberAdded(member);
+ }
+
+ /**
+ * Callback from the impl when a new member has been received
+ * @param member The member
+ */
+ public void memberDisappeared(Member member)
+ {
+ if ( listener!=null ) listener.memberDisappeared(member);
+ }
+
+ public int getMcastSoTimeout() {
+ return mcastSoTimeout;
+ }
+ public void setMcastSoTimeout(int mcastSoTimeout) {
+ this.mcastSoTimeout = mcastSoTimeout;
+ properties.setProperty("mcastSoTimeout", String.valueOf(mcastSoTimeout));
+ }
+ public int getMcastTTL() {
+ return mcastTTL;
+ }
+
+ public byte[] getPayload() {
+ return payload;
+ }
+
+ public byte[] getDomain() {
+ return domain;
+ }
+
+ public void setMcastTTL(int mcastTTL) {
+ this.mcastTTL = mcastTTL;
+ properties.setProperty("mcastTTL", String.valueOf(mcastTTL));
+ }
+
+ public void setPayload(byte[] payload) {
+ this.payload = payload;
+ if ( localMember != null ) {
+ localMember.setPayload(payload);
+ localMember.getData(true,true);
+ try {
+ if (impl != null) impl.send(false);
+ }catch ( Exception x ) {
+ log.error("Unable to send payload update.",x);
+ }
+ }
+ }
+
+ public void setDomain(byte[] domain) {
+ this.domain = domain;
+ if ( localMember != null ) {
+ localMember.setDomain(domain);
+ localMember.getData(true,true);
+ try {
+ if (impl != null) impl.send(false);
+ }catch ( Exception x ) {
+ log.error("Unable to send domain update.",x);
+ }
+ }
+ }
+
+ /**
+ * Simple test program
+ * @param args Command-line arguments
+ * @throws Exception If an error occurs
+ */
+ public static void main(String args[]) throws Exception {
+ if(log.isInfoEnabled())
+ log.info("Usage McastService hostname tcpport");
+ McastService service = new McastService();
+ java.util.Properties p = new java.util.Properties();
+ p.setProperty("mcastPort","5555");
+ p.setProperty("mcastAddress","224.10.10.10");
+ p.setProperty("mcastClusterDomain","catalina");
+ p.setProperty("bindAddress","localhost");
+ p.setProperty("memberDropTime","3000");
+ p.setProperty("mcastFrequency","500");
+ p.setProperty("tcpListenPort","4000");
+ p.setProperty("tcpListenHost","127.0.0.1");
+ service.setProperties(p);
+ service.start();
+ Thread.sleep(60*1000*60);
+ }
+}
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org