You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 06:07:15 UTC

svn commit: r780559 - in /activemq/sandbox/activemq-flow: activemq-flow/src/test/resources/META-INF/ activemq-flow/src/test/resources/META-INF/services/ activemq-flow/src/test/resources/META-INF/services/org/ activemq-flow/src/test/resources/META-INF/s...

Author: chirino
Date: Mon Jun  1 04:07:14 2009
New Revision: 780559

URL: http://svn.apache.org/viewvc?rev=780559&view=rev
Log:
adding missing classes and resources..

Added:
    activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/
    activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/
    activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/
    activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/
    activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/
    activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/
    activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
    activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
    activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/
    activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/
    activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/
    activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/
    activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/
    activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/
    activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java   (with props)
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java   (with props)

Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto (added)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto Mon Jun  1 04:07:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.ProtoWireFormatFactory

Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2 (added)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2 Mon Jun  1 04:07:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.Proto2WireFormatFactory

Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test (added)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test Mon Jun  1 04:07:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.TestWireFormatFactory

Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java Mon Jun  1 04:07:14 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+public interface MonitoredTransport {
+
+    long getWriteTimestamp();
+
+    boolean isWriting();
+
+}

Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Mon Jun  1 04:07:14 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This filter implements write timeouts for socket write operations.
+ * When using blocking IO, the Java implementation doesn't have an explicit flag
+ * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
+ * which is usually around 13-30 minutes).<br/>
+ * To enable this transport, in the transport URI, simpley add<br/>
+ * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
+ * For example (15 second timeout on write operations to the socket):</br>
+ * <pre><code>
+ * &lt;transportConnector 
+ *     name=&quot;tcp1&quot; 
+ *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
+ * /&gt;
+ * </code></pre><br/>
+ * For example (enable default timeout on the socket):</br>
+ * <pre><code>
+ * &lt;transportConnector 
+ *     name=&quot;tcp1&quot; 
+ *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
+ * /&gt;
+ * </code></pre>
+ * @author Filip Hanik
+ *
+ */
+public class WriteTimeoutFilter extends TransportFilter {
+
+    private static final Log LOG = LogFactory.getLog(WriteTimeoutFilter.class);
+    protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
+    protected static AtomicInteger messageCounter = new AtomicInteger(0);
+    protected static TimeoutThread timeoutThread = new TimeoutThread(); 
+    
+    protected static long sleep = 5000l;
+
+    protected long writeTimeout = -1;
+    
+    public WriteTimeoutFilter(Transport next) {
+        super(next);
+    }
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        try {
+            registerWrite(this);
+            super.oneway(command);
+        } catch (IOException x) {
+            throw x;
+        } finally {
+            deRegisterWrite(this,false,null);
+        }
+    }
+    
+    public long getWriteTimeout() {
+        return writeTimeout;
+    }
+
+    public void setWriteTimeout(long writeTimeout) {
+        this.writeTimeout = writeTimeout;
+    }
+    
+    public static long getSleep() {
+        return sleep;
+    }
+
+    public static void setSleep(long sleep) {
+        WriteTimeoutFilter.sleep = sleep;
+    }
+
+    
+    protected MonitoredTransport getWriter() {
+        return next.narrow(MonitoredTransport.class);
+    }
+    
+    protected Socket getSocket() {
+        return next.narrow(Socket.class);
+    }
+    
+    protected static void registerWrite(WriteTimeoutFilter filter) {
+        writers.add(filter);
+    }
+    
+    protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
+        boolean result = writers.remove(filter); 
+        if (result) {
+            if (fail) {
+                String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
+                LOG.warn(message);
+                Socket sock = filter.getSocket();
+                if (sock==null) {
+                    LOG.error("Destination socket is null, unable to close socket.("+message+")");
+                } else {
+                    try {
+                        sock.close();
+                    }catch (IOException ignore) {
+                    }
+                }
+            }
+        }
+        return result;
+    }
+    
+    @Override
+    public void start() throws Exception {
+        super.start();
+    }
+    
+    @Override
+    public void stop() throws Exception {
+        super.stop();
+    }
+    
+    protected static class TimeoutThread extends Thread {
+        static AtomicInteger instance = new AtomicInteger(0);
+        boolean run = true;
+        public TimeoutThread() {
+            setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
+            setDaemon(true);
+            setPriority(Thread.MIN_PRIORITY);
+            start();
+        }
+
+        
+        public void run() {
+            while (run) {
+            	boolean error = false;
+                try {
+                	if (!interrupted()) {
+                		Iterator<WriteTimeoutFilter> filters = writers.iterator();
+                	    while (run && filters.hasNext()) { 
+                            WriteTimeoutFilter filter = filters.next();
+                            if (filter.getWriteTimeout()<=0) continue; //no timeout set
+                            long writeStart = filter.getWriter().getWriteTimestamp();
+                            long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
+                            if (delta>filter.getWriteTimeout()) {
+                                WriteTimeoutFilter.deRegisterWrite(filter, true,null);
+                            }//if timeout
+                        }//while
+                    }//if interrupted
+                    try {
+                        Thread.sleep(getSleep());
+                        error = false;
+                    } catch (InterruptedException x) {
+                        //do nothing
+                    }
+                }catch (Throwable t) { //make sure this thread never dies
+                    if (!error) { //use error flag to avoid filling up the logs
+                        LOG.error("WriteTimeout thread unable validate existing sockets.",t);
+                        error = true;
+                    }
+                }
+            }
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe Mon Jun  1 04:07:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.pipe.PipeTransportFactory

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java Mon Jun  1 04:07:14 2009
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UTFDataFormatException;
+
+/**
+ * Optimized ByteArrayInputStream that can be used more than once
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class DataByteArrayInputStream extends InputStream implements DataInput {
+    private byte[] buf;
+    private int pos;
+    private int offset;
+
+    /**
+     * Creates a <code>StoreByteArrayInputStream</code>.
+     * 
+     * @param buf the input buffer.
+     */
+    public DataByteArrayInputStream(byte buf[]) {
+        this.buf = buf;
+        this.pos = 0;
+        this.offset = 0;
+    }
+
+    /**
+     * Creates a <code>StoreByteArrayInputStream</code>.
+     * 
+     * @param sequence the input buffer.
+     */
+    public DataByteArrayInputStream(ByteSequence sequence) {
+        this.buf = sequence.getData();
+        this.offset = sequence.getOffset();
+        this.pos =  this.offset;
+    }
+
+    /**
+     * Creates <code>WireByteArrayInputStream</code> with a minmalist byte
+     * array
+     */
+    public DataByteArrayInputStream() {
+        this(new byte[0]);
+    }
+
+    /**
+     * @return the size
+     */
+    public int size() {
+        return pos - offset;
+    }
+
+    /**
+     * @return the underlying data array
+     */
+    public byte[] getRawData() {
+        return buf;
+    }
+
+    /**
+     * reset the <code>StoreByteArrayInputStream</code> to use an new byte
+     * array
+     * 
+     * @param newBuff
+     */
+    public void restart(byte[] newBuff) {
+        buf = newBuff;
+        pos = 0;
+    }
+
+    /**
+     * reset the <code>StoreByteArrayInputStream</code> to use an new
+     * ByteSequence
+     * 
+     * @param sequence
+     */
+    public void restart(ByteSequence sequence) {
+        this.buf = sequence.getData();
+        this.pos = sequence.getOffset();
+    }
+
+    /**
+     * re-start the input stream - reusing the current buffer
+     * 
+     * @param size
+     */
+    public void restart(int size) {
+        if (buf == null || buf.length < size) {
+            buf = new byte[size];
+        }
+        restart(buf);
+    }
+
+    /**
+     * Reads the next byte of data from this input stream. The value byte is
+     * returned as an <code>int</code> in the range <code>0</code> to
+     * <code>255</code>. If no byte is available because the end of the
+     * stream has been reached, the value <code>-1</code> is returned.
+     * <p>
+     * This <code>read</code> method cannot block.
+     * 
+     * @return the next byte of data, or <code>-1</code> if the end of the
+     *         stream has been reached.
+     */
+    public int read() {
+        return (pos < buf.length) ? (buf[pos++] & 0xff) : -1;
+    }
+
+    /**
+     * Reads up to <code>len</code> bytes of data into an array of bytes from
+     * this input stream.
+     * 
+     * @param b the buffer into which the data is read.
+     * @param off the start offset of the data.
+     * @param len the maximum number of bytes read.
+     * @return the total number of bytes read into the buffer, or
+     *         <code>-1</code> if there is no more data because the end of the
+     *         stream has been reached.
+     */
+    public int read(byte b[], int off, int len) {
+        if (b == null) {
+            throw new NullPointerException();
+        }
+        if (pos >= buf.length) {
+            return -1;
+        }
+        if (pos + len > buf.length) {
+            len = buf.length - pos;
+        }
+        if (len <= 0) {
+            return 0;
+        }
+        System.arraycopy(buf, pos, b, off, len);
+        pos += len;
+        return len;
+    }
+
+    /**
+     * @return the number of bytes that can be read from the input stream
+     *         without blocking.
+     */
+    public int available() {
+        return buf.length - pos;
+    }
+
+    public void readFully(byte[] b) {
+        read(b, 0, b.length);
+    }
+
+    public void readFully(byte[] b, int off, int len) {
+        read(b, off, len);
+    }
+
+    public int skipBytes(int n) {
+        if (pos + n > buf.length) {
+            n = buf.length - pos;
+        }
+        if (n < 0) {
+            return 0;
+        }
+        pos += n;
+        return n;
+    }
+
+    public boolean readBoolean() {
+        return read() != 0;
+    }
+
+    public byte readByte() {
+        return (byte)read();
+    }
+
+    public int readUnsignedByte() {
+        return read();
+    }
+
+    public short readShort() {
+        int ch1 = read();
+        int ch2 = read();
+        return (short)((ch1 << 8) + (ch2 << 0));
+    }
+
+    public int readUnsignedShort() {
+        int ch1 = read();
+        int ch2 = read();
+        return (ch1 << 8) + (ch2 << 0);
+    }
+
+    public char readChar() {
+        int ch1 = read();
+        int ch2 = read();
+        return (char)((ch1 << 8) + (ch2 << 0));
+    }
+
+    public int readInt() {
+        int ch1 = read();
+        int ch2 = read();
+        int ch3 = read();
+        int ch4 = read();
+        return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0);
+    }
+
+    public long readLong() {
+        long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32);
+        return rc + ((long)(buf[pos++] & 255) << 24) + ((buf[pos++] & 255) << 16) + ((buf[pos++] & 255) << 8) + ((buf[pos++] & 255) << 0);
+    }
+
+    public float readFloat() throws IOException {
+        return Float.intBitsToFloat(readInt());
+    }
+
+    public double readDouble() throws IOException {
+        return Double.longBitsToDouble(readLong());
+    }
+
+    public String readLine() {
+        int start = pos;
+        while (pos < buf.length) {
+            int c = read();
+            if (c == '\n') {
+                break;
+            }
+            if (c == '\r') {
+                c = read();
+                if (c != '\n' && c != -1) {
+                    pos--;
+                }
+                break;
+            }
+        }
+        return new String(buf, start, pos);
+    }
+
+    public String readUTF() throws IOException {
+        int length = readUnsignedShort();
+        char[] characters = new char[length];
+        int c;
+        int c2;
+        int c3;
+        int count = 0;
+        int total = pos + length;
+        while (pos < total) {
+            c = (int)buf[pos] & 0xff;
+            if (c > 127) {
+                break;
+            }
+            pos++;
+            characters[count++] = (char)c;
+        }
+        while (pos < total) {
+            c = (int)buf[pos] & 0xff;
+            switch (c >> 4) {
+            case 0:
+            case 1:
+            case 2:
+            case 3:
+            case 4:
+            case 5:
+            case 6:
+            case 7:
+                pos++;
+                characters[count++] = (char)c;
+                break;
+            case 12:
+            case 13:
+                pos += 2;
+                if (pos > total) {
+                    throw new UTFDataFormatException("bad string");
+                }
+                c2 = (int)buf[pos - 1];
+                if ((c2 & 0xC0) != 0x80) {
+                    throw new UTFDataFormatException("bad string");
+                }
+                characters[count++] = (char)(((c & 0x1F) << 6) | (c2 & 0x3F));
+                break;
+            case 14:
+                pos += 3;
+                if (pos > total) {
+                    throw new UTFDataFormatException("bad string");
+                }
+                c2 = (int)buf[pos - 2];
+                c3 = (int)buf[pos - 1];
+                if (((c2 & 0xC0) != 0x80) || ((c3 & 0xC0) != 0x80)) {
+                    throw new UTFDataFormatException("bad string");
+                }
+                characters[count++] = (char)(((c & 0x0F) << 12) | ((c2 & 0x3F) << 6) | ((c3 & 0x3F) << 0));
+                break;
+            default:
+                throw new UTFDataFormatException("bad string");
+            }
+        }
+        return new String(characters, 0, count);
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java Mon Jun  1 04:07:14 2009
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+
+/**
+ * Optimized ByteArrayOutputStream
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class DataByteArrayOutputStream extends OutputStream implements DataOutput {
+    private static final int DEFAULT_SIZE = 2048;
+    private byte buf[];
+    private int pos;
+
+    /**
+     * Creates a new byte array output stream, with a buffer capacity of the
+     * specified size, in bytes.
+     * 
+     * @param size the initial size.
+     * @exception IllegalArgumentException if size is negative.
+     */
+    public DataByteArrayOutputStream(int size) {
+        if (size < 0) {
+            throw new IllegalArgumentException("Invalid size: " + size);
+        }
+        buf = new byte[size];
+    }
+
+    /**
+     * Creates a new byte array output stream.
+     */
+    public DataByteArrayOutputStream() {
+        this(DEFAULT_SIZE);
+    }
+
+    /**
+     * start using a fresh byte array
+     * 
+     * @param size
+     */
+    public void restart(int size) {
+        buf = new byte[size];
+        pos = 0;
+    }
+
+    /**
+     * start using a fresh byte array
+     */
+    public void restart() {
+        restart(DEFAULT_SIZE);
+    }
+
+    /**
+     * Get a ByteSequence from the stream
+     * 
+     * @return the byte sequence
+     */
+    public ByteSequence toByteSequence() {
+        return new ByteSequence(buf, 0, pos);
+    }
+
+    /**
+     * Writes the specified byte to this byte array output stream.
+     * 
+     * @param b the byte to be written.
+     */
+    public void write(int b) {
+        int newcount = pos + 1;
+        ensureEnoughBuffer(newcount);
+        buf[pos] = (byte)b;
+        pos = newcount;
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at
+     * offset <code>off</code> to this byte array output stream.
+     * 
+     * @param b the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     */
+    public void write(byte b[], int off, int len) {
+        if (len == 0) {
+            return;
+        }
+        int newcount = pos + len;
+        ensureEnoughBuffer(newcount);
+        System.arraycopy(b, off, buf, pos, len);
+        pos = newcount;
+    }
+
+    /**
+     * @return the underlying byte[] buffer
+     */
+    public byte[] getData() {
+        return buf;
+    }
+
+    /**
+     * reset the output stream
+     */
+    public void reset() {
+        pos = 0;
+    }
+
+    /**
+     * Set the current position for writing
+     * 
+     * @param offset
+     */
+    public void position(int offset) {
+        ensureEnoughBuffer(offset);
+        pos = offset;
+    }
+
+    public int size() {
+        return pos;
+    }
+
+    public void writeBoolean(boolean v) {
+        ensureEnoughBuffer(pos + 1);
+        buf[pos++] = (byte)(v ? 1 : 0);
+    }
+
+    public void writeByte(int v) {
+        ensureEnoughBuffer(pos + 1);
+        buf[pos++] = (byte)(v >>> 0);
+    }
+
+    public void writeShort(int v) {
+        ensureEnoughBuffer(pos + 2);
+        buf[pos++] = (byte)(v >>> 8);
+        buf[pos++] = (byte)(v >>> 0);
+    }
+
+    public void writeChar(int v) {
+        ensureEnoughBuffer(pos + 2);
+        buf[pos++] = (byte)(v >>> 8);
+        buf[pos++] = (byte)(v >>> 0);
+    }
+
+    public void writeInt(int v) {
+        ensureEnoughBuffer(pos + 4);
+        buf[pos++] = (byte)(v >>> 24);
+        buf[pos++] = (byte)(v >>> 16);
+        buf[pos++] = (byte)(v >>> 8);
+        buf[pos++] = (byte)(v >>> 0);
+    }
+
+    public void writeLong(long v) {
+        ensureEnoughBuffer(pos + 8);
+        buf[pos++] = (byte)(v >>> 56);
+        buf[pos++] = (byte)(v >>> 48);
+        buf[pos++] = (byte)(v >>> 40);
+        buf[pos++] = (byte)(v >>> 32);
+        buf[pos++] = (byte)(v >>> 24);
+        buf[pos++] = (byte)(v >>> 16);
+        buf[pos++] = (byte)(v >>> 8);
+        buf[pos++] = (byte)(v >>> 0);
+    }
+
+    public void writeFloat(float v) throws IOException {
+        writeInt(Float.floatToIntBits(v));
+    }
+
+    public void writeDouble(double v) throws IOException {
+        writeLong(Double.doubleToLongBits(v));
+    }
+
+    public void writeBytes(String s) {
+        int length = s.length();
+        for (int i = 0; i < length; i++) {
+            write((byte)s.charAt(i));
+        }
+    }
+
+    public void writeChars(String s) {
+        int length = s.length();
+        for (int i = 0; i < length; i++) {
+            int c = s.charAt(i);
+            write((c >>> 8) & 0xFF);
+            write((c >>> 0) & 0xFF);
+        }
+    }
+
+    public void writeUTF(String str) throws IOException {
+        int strlen = str.length();
+        int encodedsize = 0;
+        int c;
+        for (int i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F)) {
+                encodedsize++;
+            } else if (c > 0x07FF) {
+                encodedsize += 3;
+            } else {
+                encodedsize += 2;
+            }
+        }
+        if (encodedsize > 65535) {
+            throw new UTFDataFormatException("encoded string too long: " + encodedsize + " bytes");
+        }
+        ensureEnoughBuffer(pos + encodedsize + 2);
+        writeShort(encodedsize);
+        int i = 0;
+        for (i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if (!((c >= 0x0001) && (c <= 0x007F))) {
+                break;
+            }
+            buf[pos++] = (byte)c;
+        }
+        for (; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F)) {
+                buf[pos++] = (byte)c;
+            } else if (c > 0x07FF) {
+                buf[pos++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
+                buf[pos++] = (byte)(0x80 | ((c >> 6) & 0x3F));
+                buf[pos++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+            } else {
+                buf[pos++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
+                buf[pos++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+            }
+        }
+    }
+
+    private void ensureEnoughBuffer(int newcount) {
+        if (newcount > buf.length) {
+            byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+            System.arraycopy(buf, 0, newbuf, 0, pos);
+            buf = newbuf;
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java
------------------------------------------------------------------------------
    svn:executable = *