You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 06:07:15 UTC
svn commit: r780559 - in /activemq/sandbox/activemq-flow:
activemq-flow/src/test/resources/META-INF/
activemq-flow/src/test/resources/META-INF/services/
activemq-flow/src/test/resources/META-INF/services/org/
activemq-flow/src/test/resources/META-INF/s...
Author: chirino
Date: Mon Jun 1 04:07:14 2009
New Revision: 780559
URL: http://svn.apache.org/viewvc?rev=780559&view=rev
Log:
adding missing classes and resources..
Added:
activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/
activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/
activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/
activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/
activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/
activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/
activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/
activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/
activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/
activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/
activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/
activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/
activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java (with props)
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java (with props)
Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto (added)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto Mon Jun 1 04:07:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.ProtoWireFormatFactory
Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2 (added)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2 Mon Jun 1 04:07:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.Proto2WireFormatFactory
Added: activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test (added)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/test Mon Jun 1 04:07:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.TestWireFormatFactory
Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/MonitoredTransport.java Mon Jun 1 04:07:14 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+public interface MonitoredTransport {
+
+ long getWriteTimestamp();
+
+ boolean isWriting();
+
+}
Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Mon Jun 1 04:07:14 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This filter implements write timeouts for socket write operations.
+ * When using blocking IO, the Java implementation doesn't have an explicit flag
+ * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
+ * which is usually around 13-30 minutes).<br/>
+ * To enable this transport, in the transport URI, simpley add<br/>
+ * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
+ * For example (15 second timeout on write operations to the socket):</br>
+ * <pre><code>
+ * <transportConnector
+ * name="tcp1"
+ * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
+ * />
+ * </code></pre><br/>
+ * For example (enable default timeout on the socket):</br>
+ * <pre><code>
+ * <transportConnector
+ * name="tcp1"
+ * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
+ * />
+ * </code></pre>
+ * @author Filip Hanik
+ *
+ */
+public class WriteTimeoutFilter extends TransportFilter {
+
+ private static final Log LOG = LogFactory.getLog(WriteTimeoutFilter.class);
+ protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
+ protected static AtomicInteger messageCounter = new AtomicInteger(0);
+ protected static TimeoutThread timeoutThread = new TimeoutThread();
+
+ protected static long sleep = 5000l;
+
+ protected long writeTimeout = -1;
+
+ public WriteTimeoutFilter(Transport next) {
+ super(next);
+ }
+
+ @Override
+ public void oneway(Object command) throws IOException {
+ try {
+ registerWrite(this);
+ super.oneway(command);
+ } catch (IOException x) {
+ throw x;
+ } finally {
+ deRegisterWrite(this,false,null);
+ }
+ }
+
+ public long getWriteTimeout() {
+ return writeTimeout;
+ }
+
+ public void setWriteTimeout(long writeTimeout) {
+ this.writeTimeout = writeTimeout;
+ }
+
+ public static long getSleep() {
+ return sleep;
+ }
+
+ public static void setSleep(long sleep) {
+ WriteTimeoutFilter.sleep = sleep;
+ }
+
+
+ protected MonitoredTransport getWriter() {
+ return next.narrow(MonitoredTransport.class);
+ }
+
+ protected Socket getSocket() {
+ return next.narrow(Socket.class);
+ }
+
+ protected static void registerWrite(WriteTimeoutFilter filter) {
+ writers.add(filter);
+ }
+
+ protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
+ boolean result = writers.remove(filter);
+ if (result) {
+ if (fail) {
+ String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
+ LOG.warn(message);
+ Socket sock = filter.getSocket();
+ if (sock==null) {
+ LOG.error("Destination socket is null, unable to close socket.("+message+")");
+ } else {
+ try {
+ sock.close();
+ }catch (IOException ignore) {
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ super.stop();
+ }
+
+ protected static class TimeoutThread extends Thread {
+ static AtomicInteger instance = new AtomicInteger(0);
+ boolean run = true;
+ public TimeoutThread() {
+ setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
+ setDaemon(true);
+ setPriority(Thread.MIN_PRIORITY);
+ start();
+ }
+
+
+ public void run() {
+ while (run) {
+ boolean error = false;
+ try {
+ if (!interrupted()) {
+ Iterator<WriteTimeoutFilter> filters = writers.iterator();
+ while (run && filters.hasNext()) {
+ WriteTimeoutFilter filter = filters.next();
+ if (filter.getWriteTimeout()<=0) continue; //no timeout set
+ long writeStart = filter.getWriter().getWriteTimestamp();
+ long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
+ if (delta>filter.getWriteTimeout()) {
+ WriteTimeoutFilter.deRegisterWrite(filter, true,null);
+ }//if timeout
+ }//while
+ }//if interrupted
+ try {
+ Thread.sleep(getSleep());
+ error = false;
+ } catch (InterruptedException x) {
+ //do nothing
+ }
+ }catch (Throwable t) { //make sure this thread never dies
+ if (!error) { //use error flag to avoid filling up the logs
+ LOG.error("WriteTimeout thread unable validate existing sockets.",t);
+ error = true;
+ }
+ }
+ }
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/resources/META-INF/services/org/apache/activemq/transport/pipe Mon Jun 1 04:07:14 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.pipe.PipeTransportFactory
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java Mon Jun 1 04:07:14 2009
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UTFDataFormatException;
+
+/**
+ * Optimized ByteArrayInputStream that can be used more than once
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class DataByteArrayInputStream extends InputStream implements DataInput {
+ private byte[] buf;
+ private int pos;
+ private int offset;
+
+ /**
+ * Creates a <code>StoreByteArrayInputStream</code>.
+ *
+ * @param buf the input buffer.
+ */
+ public DataByteArrayInputStream(byte buf[]) {
+ this.buf = buf;
+ this.pos = 0;
+ this.offset = 0;
+ }
+
+ /**
+ * Creates a <code>StoreByteArrayInputStream</code>.
+ *
+ * @param sequence the input buffer.
+ */
+ public DataByteArrayInputStream(ByteSequence sequence) {
+ this.buf = sequence.getData();
+ this.offset = sequence.getOffset();
+ this.pos = this.offset;
+ }
+
+ /**
+ * Creates <code>WireByteArrayInputStream</code> with a minmalist byte
+ * array
+ */
+ public DataByteArrayInputStream() {
+ this(new byte[0]);
+ }
+
+ /**
+ * @return the size
+ */
+ public int size() {
+ return pos - offset;
+ }
+
+ /**
+ * @return the underlying data array
+ */
+ public byte[] getRawData() {
+ return buf;
+ }
+
+ /**
+ * reset the <code>StoreByteArrayInputStream</code> to use an new byte
+ * array
+ *
+ * @param newBuff
+ */
+ public void restart(byte[] newBuff) {
+ buf = newBuff;
+ pos = 0;
+ }
+
+ /**
+ * reset the <code>StoreByteArrayInputStream</code> to use an new
+ * ByteSequence
+ *
+ * @param sequence
+ */
+ public void restart(ByteSequence sequence) {
+ this.buf = sequence.getData();
+ this.pos = sequence.getOffset();
+ }
+
+ /**
+ * re-start the input stream - reusing the current buffer
+ *
+ * @param size
+ */
+ public void restart(int size) {
+ if (buf == null || buf.length < size) {
+ buf = new byte[size];
+ }
+ restart(buf);
+ }
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte is
+ * returned as an <code>int</code> in the range <code>0</code> to
+ * <code>255</code>. If no byte is available because the end of the
+ * stream has been reached, the value <code>-1</code> is returned.
+ * <p>
+ * This <code>read</code> method cannot block.
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the
+ * stream has been reached.
+ */
+ public int read() {
+ return (pos < buf.length) ? (buf[pos++] & 0xff) : -1;
+ }
+
+ /**
+ * Reads up to <code>len</code> bytes of data into an array of bytes from
+ * this input stream.
+ *
+ * @param b the buffer into which the data is read.
+ * @param off the start offset of the data.
+ * @param len the maximum number of bytes read.
+ * @return the total number of bytes read into the buffer, or
+ * <code>-1</code> if there is no more data because the end of the
+ * stream has been reached.
+ */
+ public int read(byte b[], int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (pos >= buf.length) {
+ return -1;
+ }
+ if (pos + len > buf.length) {
+ len = buf.length - pos;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ return len;
+ }
+
+ /**
+ * @return the number of bytes that can be read from the input stream
+ * without blocking.
+ */
+ public int available() {
+ return buf.length - pos;
+ }
+
+ public void readFully(byte[] b) {
+ read(b, 0, b.length);
+ }
+
+ public void readFully(byte[] b, int off, int len) {
+ read(b, off, len);
+ }
+
+ public int skipBytes(int n) {
+ if (pos + n > buf.length) {
+ n = buf.length - pos;
+ }
+ if (n < 0) {
+ return 0;
+ }
+ pos += n;
+ return n;
+ }
+
+ public boolean readBoolean() {
+ return read() != 0;
+ }
+
+ public byte readByte() {
+ return (byte)read();
+ }
+
+ public int readUnsignedByte() {
+ return read();
+ }
+
+ public short readShort() {
+ int ch1 = read();
+ int ch2 = read();
+ return (short)((ch1 << 8) + (ch2 << 0));
+ }
+
+ public int readUnsignedShort() {
+ int ch1 = read();
+ int ch2 = read();
+ return (ch1 << 8) + (ch2 << 0);
+ }
+
+ public char readChar() {
+ int ch1 = read();
+ int ch2 = read();
+ return (char)((ch1 << 8) + (ch2 << 0));
+ }
+
+ public int readInt() {
+ int ch1 = read();
+ int ch2 = read();
+ int ch3 = read();
+ int ch4 = read();
+ return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0);
+ }
+
+ public long readLong() {
+ long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32);
+ return rc + ((long)(buf[pos++] & 255) << 24) + ((buf[pos++] & 255) << 16) + ((buf[pos++] & 255) << 8) + ((buf[pos++] & 255) << 0);
+ }
+
+ public float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ public double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ public String readLine() {
+ int start = pos;
+ while (pos < buf.length) {
+ int c = read();
+ if (c == '\n') {
+ break;
+ }
+ if (c == '\r') {
+ c = read();
+ if (c != '\n' && c != -1) {
+ pos--;
+ }
+ break;
+ }
+ }
+ return new String(buf, start, pos);
+ }
+
+ public String readUTF() throws IOException {
+ int length = readUnsignedShort();
+ char[] characters = new char[length];
+ int c;
+ int c2;
+ int c3;
+ int count = 0;
+ int total = pos + length;
+ while (pos < total) {
+ c = (int)buf[pos] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ pos++;
+ characters[count++] = (char)c;
+ }
+ while (pos < total) {
+ c = (int)buf[pos] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ pos++;
+ characters[count++] = (char)c;
+ break;
+ case 12:
+ case 13:
+ pos += 2;
+ if (pos > total) {
+ throw new UTFDataFormatException("bad string");
+ }
+ c2 = (int)buf[pos - 1];
+ if ((c2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException("bad string");
+ }
+ characters[count++] = (char)(((c & 0x1F) << 6) | (c2 & 0x3F));
+ break;
+ case 14:
+ pos += 3;
+ if (pos > total) {
+ throw new UTFDataFormatException("bad string");
+ }
+ c2 = (int)buf[pos - 2];
+ c3 = (int)buf[pos - 1];
+ if (((c2 & 0xC0) != 0x80) || ((c3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException("bad string");
+ }
+ characters[count++] = (char)(((c & 0x0F) << 12) | ((c2 & 0x3F) << 6) | ((c3 & 0x3F) << 0));
+ break;
+ default:
+ throw new UTFDataFormatException("bad string");
+ }
+ }
+ return new String(characters, 0, count);
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java?rev=780559&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java Mon Jun 1 04:07:14 2009
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+
+/**
+ * Optimized ByteArrayOutputStream
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class DataByteArrayOutputStream extends OutputStream implements DataOutput {
+ private static final int DEFAULT_SIZE = 2048;
+ private byte buf[];
+ private int pos;
+
+ /**
+ * Creates a new byte array output stream, with a buffer capacity of the
+ * specified size, in bytes.
+ *
+ * @param size the initial size.
+ * @exception IllegalArgumentException if size is negative.
+ */
+ public DataByteArrayOutputStream(int size) {
+ if (size < 0) {
+ throw new IllegalArgumentException("Invalid size: " + size);
+ }
+ buf = new byte[size];
+ }
+
+ /**
+ * Creates a new byte array output stream.
+ */
+ public DataByteArrayOutputStream() {
+ this(DEFAULT_SIZE);
+ }
+
+ /**
+ * start using a fresh byte array
+ *
+ * @param size
+ */
+ public void restart(int size) {
+ buf = new byte[size];
+ pos = 0;
+ }
+
+ /**
+ * start using a fresh byte array
+ */
+ public void restart() {
+ restart(DEFAULT_SIZE);
+ }
+
+ /**
+ * Get a ByteSequence from the stream
+ *
+ * @return the byte sequence
+ */
+ public ByteSequence toByteSequence() {
+ return new ByteSequence(buf, 0, pos);
+ }
+
+ /**
+ * Writes the specified byte to this byte array output stream.
+ *
+ * @param b the byte to be written.
+ */
+ public void write(int b) {
+ int newcount = pos + 1;
+ ensureEnoughBuffer(newcount);
+ buf[pos] = (byte)b;
+ pos = newcount;
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to this byte array output stream.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ */
+ public void write(byte b[], int off, int len) {
+ if (len == 0) {
+ return;
+ }
+ int newcount = pos + len;
+ ensureEnoughBuffer(newcount);
+ System.arraycopy(b, off, buf, pos, len);
+ pos = newcount;
+ }
+
+ /**
+ * @return the underlying byte[] buffer
+ */
+ public byte[] getData() {
+ return buf;
+ }
+
+ /**
+ * reset the output stream
+ */
+ public void reset() {
+ pos = 0;
+ }
+
+ /**
+ * Set the current position for writing
+ *
+ * @param offset
+ */
+ public void position(int offset) {
+ ensureEnoughBuffer(offset);
+ pos = offset;
+ }
+
+ public int size() {
+ return pos;
+ }
+
+ public void writeBoolean(boolean v) {
+ ensureEnoughBuffer(pos + 1);
+ buf[pos++] = (byte)(v ? 1 : 0);
+ }
+
+ public void writeByte(int v) {
+ ensureEnoughBuffer(pos + 1);
+ buf[pos++] = (byte)(v >>> 0);
+ }
+
+ public void writeShort(int v) {
+ ensureEnoughBuffer(pos + 2);
+ buf[pos++] = (byte)(v >>> 8);
+ buf[pos++] = (byte)(v >>> 0);
+ }
+
+ public void writeChar(int v) {
+ ensureEnoughBuffer(pos + 2);
+ buf[pos++] = (byte)(v >>> 8);
+ buf[pos++] = (byte)(v >>> 0);
+ }
+
+ public void writeInt(int v) {
+ ensureEnoughBuffer(pos + 4);
+ buf[pos++] = (byte)(v >>> 24);
+ buf[pos++] = (byte)(v >>> 16);
+ buf[pos++] = (byte)(v >>> 8);
+ buf[pos++] = (byte)(v >>> 0);
+ }
+
+ public void writeLong(long v) {
+ ensureEnoughBuffer(pos + 8);
+ buf[pos++] = (byte)(v >>> 56);
+ buf[pos++] = (byte)(v >>> 48);
+ buf[pos++] = (byte)(v >>> 40);
+ buf[pos++] = (byte)(v >>> 32);
+ buf[pos++] = (byte)(v >>> 24);
+ buf[pos++] = (byte)(v >>> 16);
+ buf[pos++] = (byte)(v >>> 8);
+ buf[pos++] = (byte)(v >>> 0);
+ }
+
+ public void writeFloat(float v) throws IOException {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v) throws IOException {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s) {
+ int length = s.length();
+ for (int i = 0; i < length; i++) {
+ write((byte)s.charAt(i));
+ }
+ }
+
+ public void writeChars(String s) {
+ int length = s.length();
+ for (int i = 0; i < length; i++) {
+ int c = s.charAt(i);
+ write((c >>> 8) & 0xFF);
+ write((c >>> 0) & 0xFF);
+ }
+ }
+
+ public void writeUTF(String str) throws IOException {
+ int strlen = str.length();
+ int encodedsize = 0;
+ int c;
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ encodedsize++;
+ } else if (c > 0x07FF) {
+ encodedsize += 3;
+ } else {
+ encodedsize += 2;
+ }
+ }
+ if (encodedsize > 65535) {
+ throw new UTFDataFormatException("encoded string too long: " + encodedsize + " bytes");
+ }
+ ensureEnoughBuffer(pos + encodedsize + 2);
+ writeShort(encodedsize);
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ buf[pos++] = (byte)c;
+ }
+ for (; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ buf[pos++] = (byte)c;
+ } else if (c > 0x07FF) {
+ buf[pos++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
+ buf[pos++] = (byte)(0x80 | ((c >> 6) & 0x3F));
+ buf[pos++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ } else {
+ buf[pos++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
+ buf[pos++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ }
+
+ private void ensureEnoughBuffer(int newcount) {
+ if (newcount > buf.length) {
+ byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, pos);
+ buf = newbuf;
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/DataByteArrayOutputStream.java
------------------------------------------------------------------------------
svn:executable = *