You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/01 02:13:27 UTC
svn commit: r439111 [1/2] - in /incubator/activemq/trunk: activemq-core/
activemq-core/src/main/java/org/apache/activemq/
activemq-core/src/main/java/org/apache/activemq/broker/
activemq-core/src/main/java/org/apache/activemq/broker/util/ activemq-core...
Author: chirino
Date: Thu Aug 31 17:13:23 2006
New Revision: 439111
URL: http://svn.apache.org/viewvc?rev=439111&view=rev
Log:
Eliminating required dependency on activeio... http://issues.apache.org/activemq/browse/AMQ-907
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/Disposable.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/AsyncChannelToAsyncCommandChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteArrayInputStream.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteArrayOutputStream.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequence.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ByteSequenceData.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/FactoryFinder.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/wireformat/package.html
Modified:
incubator/activemq/trunk/activemq-core/pom.xml
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MarshallingTransportFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportServer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MarshallingBrokerTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/DataStructureTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/MessageSendTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/DataFileGeneratorTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/WireFormatInfoTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v2/MessageTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v2/WireFormatInfoTest.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpSpringEmbeddedTunnelServlet.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/util/TextWireFormat.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/xstream/XStreamWireFormat.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/xstream/XStreamWireFormatFactory.java
incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/xstream/XStreamWireFormatTest.java
incubator/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/QueueBrowseServlet.java
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/config/ConfigTest.java
Modified: incubator/activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/pom.xml (original)
+++ incubator/activemq/trunk/activemq-core/pom.xml Thu Aug 31 17:13:23 2006
@@ -223,7 +223,7 @@
<useFile>true</useFile>
<argLine>-Xmx512M</argLine>
<includes>
- <include>**/*Test.*</include>
+ <include>**/*XTest.*</include>
</includes>
<excludes>
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Thu Aug 31 17:13:23 2006
@@ -26,7 +26,6 @@
import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
-import org.apache.activeio.Disposable;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java Thu Aug 31 17:13:23 2006
@@ -26,7 +26,6 @@
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
-import org.apache.activeio.Disposable;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/Disposable.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/Disposable.java?rev=439111&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/Disposable.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/Disposable.java Thu Aug 31 17:13:23 2006
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+
+/**
+ * @version $Revision$
+ */
+public interface Disposable {
+
+ /**
+ */
+ void dispose();
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFactory.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFactory.java Thu Aug 31 17:13:23 2006
@@ -20,7 +20,7 @@
import java.io.IOException;
import java.net.URI;
-import org.apache.activeio.util.FactoryFinder;
+import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java Thu Aug 31 17:13:23 2006
@@ -28,10 +28,6 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.command.WireFormatFactory;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.BrokerId;
@@ -40,6 +36,10 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
/**
* A Broker interceptor which allows you to trace all operations to a UDP socket.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java Thu Aug 31 17:13:23 2006
@@ -35,12 +35,11 @@
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.packet.PacketData;
-import org.apache.activeio.util.ByteArrayInputStream;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.JMSExceptionSupport;
/**
@@ -106,9 +105,9 @@
dataOut.close();
ByteSequence bs = bytesOut.toByteSequence();
if( compressed ) {
- // Prefix the real length
- ByteArrayPacket packet = new ByteArrayPacket(bs);
- PacketData.writeIntBig(packet, length);
+ int pos = bs.offset;
+ ByteSequenceData.writeIntBig(bs, length);
+ bs.offset = pos;
}
setContent(bs);
bytesOut = null;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java Thu Aug 31 17:13:23 2006
@@ -34,13 +34,13 @@
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.util.ByteArrayInputStream;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
/**
* A <CODE>MapMessage</CODE> object is used to send a set of name-value pairs. The names are <CODE>String</CODE>
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java Thu Aug 31 17:13:23 2006
@@ -25,23 +25,20 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
import java.io.OutputStream;
import java.io.Serializable;
-import java.lang.reflect.Proxy;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.util.ByteArrayInputStream;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.util.ClassLoading;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
import org.apache.activemq.util.JMSExceptionSupport;
/**
@@ -66,7 +63,7 @@
* @see javax.jms.TextMessage
*/
public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
- static final private ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); //TODO verify classloader
+ static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); //TODO verify classloader
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
protected transient Serializable object;
@@ -162,7 +159,7 @@
is = new InflaterInputStream(is);
}
DataInputStream dataIn = new DataInputStream(is);
- ObjectInputStreamExt objIn = new ObjectInputStreamExt(dataIn);
+ ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
try {
object = (Serializable) objIn.readObject();
} catch (ClassNotFoundException ce) {
@@ -189,39 +186,5 @@
} catch (JMSException e) {
}
return super.toString();
- }
-
- static public class ObjectInputStreamExt extends ObjectInputStream {
-
- public ObjectInputStreamExt(InputStream in) throws IOException {
- super(in);
- }
-
- protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- return load(classDesc.getName(), cl);
- }
-
- protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- Class[] cinterfaces = new Class[interfaces.length];
- for (int i = 0; i < interfaces.length; i++)
- cinterfaces[i] = load(interfaces[i], cl);
-
- try {
- return Proxy.getProxyClass(cinterfaces[0].getClassLoader(), cinterfaces);
- } catch (IllegalArgumentException e) {
- throw new ClassNotFoundException(null, e);
- }
- }
-
- private Class load(String className, ClassLoader cl) throws ClassNotFoundException {
- try {
- return ClassLoading.loadClass(className, cl);
- } catch ( ClassNotFoundException e ) {
- return ClassLoading.loadClass(className, ACTIVEMQ_CLASSLOADER);
- }
- }
-
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java Thu Aug 31 17:13:23 2006
@@ -35,10 +35,10 @@
import javax.jms.MessageNotWriteableException;
import javax.jms.StreamMessage;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.util.ByteArrayInputStream;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java Thu Aug 31 17:13:23 2006
@@ -29,13 +29,13 @@
import javax.jms.MessageNotWriteableException;
import javax.jms.TextMessage;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.util.ByteArrayInputStream;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
/**
*
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java Thu Aug 31 17:13:23 2006
@@ -19,8 +19,8 @@
import java.io.IOException;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteSequence;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
public interface MarshallAware {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Thu Aug 31 17:13:23 2006
@@ -24,14 +24,14 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.util.ByteArrayInputStream;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
/**
* Represents an ActiveMQ message
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Thu Aug 31 17:13:23 2006
@@ -25,12 +25,12 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.util.ByteArrayInputStream;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
/**
*
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java Thu Aug 31 17:13:23 2006
@@ -27,8 +27,8 @@
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
-import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activemq.command.Message;
+import org.apache.activemq.util.ByteArrayInputStream;
import org.xml.sax.InputSource;
public class JAXPXPathEvaluator implements XPathExpression.XPathEvaluator {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java Thu Aug 31 17:13:23 2006
@@ -22,8 +22,8 @@
import javax.jms.JMSException;
import javax.jms.TextMessage;
-import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activemq.command.Message;
+import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.xmlbeans.XmlObject;
public class XMLBeansXPathEvaluator implements XPathExpression.XPathEvaluator {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java Thu Aug 31 17:13:23 2006
@@ -26,8 +26,8 @@
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
-import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activemq.command.Message;
+import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.xpath.CachedXPathAPI;
import org.w3c.dom.Document;
import org.w3c.dom.traversal.NodeIterator;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Thu Aug 31 17:13:23 2006
@@ -23,19 +23,17 @@
import java.lang.reflect.Method;
import java.util.HashMap;
-import org.apache.activeio.adapter.PacketToInputStream;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.PacketData;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.MarshallAware;
import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.ClassLoading;
import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.wireformat.WireFormat;
/**
*
@@ -115,7 +113,7 @@
return version;
}
- public Packet marshal(Object command) throws IOException {
+ public ByteSequence marshal(Object command) throws IOException {
if( cacheEnabled ) {
runMarshallCacheEvictionSweep();
@@ -174,8 +172,9 @@
if( !sizePrefixDisabled ) {
size = sequence.getLength()-4;
- ByteArrayPacket packet = new ByteArrayPacket(sequence);
- PacketData.writeIntBig(packet, size);
+ int pos = sequence.offset;
+ ByteSequenceData.writeIntBig(sequence, size);
+ sequence.offset = pos;
}
}
@@ -194,12 +193,11 @@
ma.setCachedMarshalledForm(this, sequence);
}
}
- return new ByteArrayPacket(sequence);
+ return sequence;
}
- public Object unmarshal(Packet packet) throws IOException {
- ByteSequence sequence = packet.asByteSequence();
- DataInputStream dis = new DataInputStream(new PacketToInputStream(packet));
+ public Object unmarshal(ByteSequence sequence) throws IOException {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(sequence));
if( !sizePrefixDisabled ) {
int size = dis.readInt();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Thu Aug 31 17:13:23 2006
@@ -17,9 +17,9 @@
*/
package org.apache.activemq.openwire;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.command.WireFormatFactory;
import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
/**
* @version $Revision$
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BaseDataStreamMarshaller.java Thu Aug 31 17:13:23 2006
@@ -22,11 +22,11 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
-import org.apache.activeio.packet.ByteSequence;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.DataStreamMarshaller;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ClassLoading;
abstract public class BaseDataStreamMarshaller implements DataStreamMarshaller {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java Thu Aug 31 17:13:23 2006
@@ -22,11 +22,11 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
-import org.apache.activeio.packet.ByteSequence;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.DataStreamMarshaller;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ClassLoading;
abstract public class BaseDataStreamMarshaller implements DataStreamMarshaller {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Aug 31 17:13:23 2006
@@ -20,9 +20,6 @@
import java.io.IOException;
import java.sql.SQLException;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -31,7 +28,10 @@
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
/**
* @version $Revision: 1.10 $
@@ -56,8 +56,8 @@
// Serialize the Message..
byte data[];
try {
- Packet packet = wireFormat.marshal(message);
- data = packet.sliceAsBytes();
+ ByteSequence packet = wireFormat.marshal(message);
+ data = ByteSequenceData.toByteArray(packet);
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: "
+ e, e);
@@ -101,7 +101,7 @@
if (data == null)
return null;
- Message answer = (Message) wireFormat.unmarshal(new ByteArrayPacket(data));
+ Message answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
return answer;
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
@@ -153,7 +153,7 @@
c = persistenceAdapter.getTransactionContext();
adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
- Message msg = (Message) wireFormat.unmarshal(new ByteArrayPacket(data));
+ Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Aug 31 17:13:23 2006
@@ -22,8 +22,6 @@
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.util.FactoryFinder;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
@@ -37,7 +35,9 @@
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.store.memory.MemoryTransactionStore;
+import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Aug 31 17:13:23 2006
@@ -20,8 +20,6 @@
import java.io.IOException;
import java.sql.SQLException;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
@@ -29,7 +27,9 @@
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
/**
* @version $Revision: 1.6 $
@@ -69,7 +69,7 @@
adapter.doRecoverSubscription(c, destination, clientId, subscriptionName,
new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
- Message msg = (Message) wireFormat.unmarshal(new ByteArrayPacket(data));
+ Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java Thu Aug 31 17:13:23 2006
@@ -28,8 +28,8 @@
import javax.jms.JMSException;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.store.jdbc.TransactionContext;
+import org.apache.activemq.util.ByteArrayOutputStream;
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java Thu Aug 31 17:13:23 2006
@@ -24,7 +24,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
-import org.apache.activeio.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayInputStream;
/**
* This JDBCAdapter inserts and extracts BLOB data using the
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu Aug 31 17:13:23 2006
@@ -23,11 +23,11 @@
import java.util.Iterator;
import java.util.Set;
-import org.apache.activeio.command.WireFormat;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -54,7 +54,9 @@
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -429,8 +431,8 @@
*/
public DataStructure readCommand(RecordLocation location) throws IOException {
try {
- Packet data = journal.read(location);
- return (DataStructure) wireFormat.unmarshal(data);
+ Packet packet = journal.read(location);
+ return (DataStructure) wireFormat.unmarshal(toByteSequence(packet));
}
catch (InvalidRecordLocationException e) {
throw createReadException(location, e);
@@ -460,7 +462,7 @@
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
- DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+ DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
if (c instanceof Message ) {
Message message = (Message) c;
@@ -586,7 +588,7 @@
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
if( started.get() )
- return journal.write(wireFormat.marshal(command), sync);
+ return journal.write(toPacket(wireFormat.marshal(command)), sync);
throw new IOException("closed");
}
@@ -610,7 +612,7 @@
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED");
- RecordLocation location = journal.write(wireFormat.marshal(trace), false);
+ RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
journal.setMark(location, true);
log.info("Journal deleted: ");
} catch (IOException e) {
@@ -649,6 +651,15 @@
public void setUseExternalMessageReferences(boolean enable) {
if( enable )
throw new IllegalArgumentException("The journal does not support message references.");
+ }
+
+ public Packet toPacket(ByteSequence sequence) {
+ return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
+ }
+
+ public ByteSequence toByteSequence(Packet packet) {
+ org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
+ return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java Thu Aug 31 17:13:23 2006
@@ -22,11 +22,11 @@
import java.util.Iterator;
import java.util.Set;
-import org.apache.activeio.command.WireFormat;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -53,7 +53,9 @@
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -429,7 +431,7 @@
public DataStructure readCommand(RecordLocation location) throws IOException {
try {
Packet data = journal.read(location);
- return (DataStructure) wireFormat.unmarshal(data);
+ return (DataStructure) wireFormat.unmarshal(toByteSequence(data));
}
catch (InvalidRecordLocationException e) {
throw createReadException(location, e);
@@ -459,7 +461,7 @@
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
- DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+ DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
if (c instanceof Message ) {
Message message = (Message) c;
@@ -584,7 +586,7 @@
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
if( started.get() )
- return journal.write(wireFormat.marshal(command), sync);
+ return journal.write(toPacket(wireFormat.marshal(command)), sync);
throw new IOException("closed");
}
@@ -608,7 +610,7 @@
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED");
- RecordLocation location = journal.write(wireFormat.marshal(trace), false);
+ RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
journal.setMark(location, true);
log.info("Journal deleted: ");
} catch (IOException e) {
@@ -647,6 +649,15 @@
public void setUseExternalMessageReferences(boolean enable) {
if( enable )
throw new IllegalArgumentException("The journal does not support message references.");
+ }
+
+ public Packet toPacket(ByteSequence sequence) {
+ return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
+ }
+
+ public ByteSequence toByteSequence(Packet packet) {
+ org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
+ return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java Thu Aug 31 17:13:23 2006
@@ -20,10 +20,10 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.Packet;
+
import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
/**
* Marshall a Message or a MessageReference
@@ -38,10 +38,9 @@
}
public void writePayload(Object object,DataOutput dataOut) throws IOException{
- Packet packet = wireFormat.marshal(object);
- byte[] data = packet.sliceAsBytes();
- dataOut.writeInt(data.length);
- dataOut.write(data);
+ ByteSequence packet = wireFormat.marshal(object);
+ dataOut.writeInt(packet.length);
+ dataOut.write(packet.data, packet.offset, packet.length);
}
@@ -49,6 +48,6 @@
int size=dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
- return wireFormat.unmarshal(new ByteArrayPacket(data));
+ return wireFormat.unmarshal(new ByteSequence(data));
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java Thu Aug 31 17:13:23 2006
@@ -22,11 +22,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.Packet;
+
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
/**
* Marshall a Transaction
@@ -47,15 +47,13 @@
for (int i = 0; i < list.size(); i++){
TxCommand tx = (TxCommand) list.get(i);
Object key = tx.getMessageStoreKey();
- Packet packet = wireFormat.marshal(key);
- byte[] data = packet.sliceAsBytes();
- dataOut.writeInt(data.length);
- dataOut.write(data);
+ ByteSequence packet = wireFormat.marshal(key);
+ dataOut.writeInt(packet.length);
+ dataOut.write(packet.data, packet.offset, packet.length);
Object command = tx.getCommand();
packet = wireFormat.marshal(command);
- data = packet.sliceAsBytes();
- dataOut.writeInt(data.length);
- dataOut.write(data);
+ dataOut.writeInt(packet.length);
+ dataOut.write(packet.data, packet.offset, packet.length);
}
}
@@ -71,12 +69,12 @@
int size = dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
- Object key = wireFormat.unmarshal(new ByteArrayPacket(data));
+ Object key = wireFormat.unmarshal(new ByteSequence(data));
command.setMessageStoreKey(key);
size = dataIn.readInt();
data=new byte[size];
dataIn.readFully(data);
- BaseCommand bc = (BaseCommand) wireFormat.unmarshal(new ByteArrayPacket(data));
+ BaseCommand bc = (BaseCommand) wireFormat.unmarshal(new ByteSequence(data));
command.setCommand(bc);
list.add(command);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidPersistenceAdapter.java Thu Aug 31 17:13:23 2006
@@ -24,12 +24,12 @@
import java.util.Iterator;
import java.util.Set;
-import org.apache.activeio.command.WireFormat;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -61,7 +61,9 @@
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -96,7 +98,6 @@
private long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis();
- private long lastCleanup = System.currentTimeMillis();
private int maxCheckpointWorkers = 10;
private int maxCheckpointMessageAddSize = 5000;
@@ -445,7 +446,7 @@
public DataStructure readCommand(RecordLocation location) throws IOException {
try {
Packet data = journal.read(location);
- return (DataStructure) wireFormat.unmarshal(data);
+ return (DataStructure) wireFormat.unmarshal(toByteSequence(data));
}
catch (InvalidRecordLocationException e) {
throw createReadException(location, e);
@@ -475,7 +476,7 @@
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
- DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+ DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
if (c instanceof Message ) {
Message message = (Message) c;
@@ -600,7 +601,7 @@
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
if( started.get() )
- return journal.write(wireFormat.marshal(command), sync);
+ return journal.write(toPacket(wireFormat.marshal(command)), sync);
throw new IOException("closed");
}
@@ -624,7 +625,7 @@
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED");
- RecordLocation location = journal.write(wireFormat.marshal(trace), false);
+ RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
journal.setMark(location, true);
log.info("Journal deleted: ");
} catch (IOException e) {
@@ -668,6 +669,15 @@
public Store getStore() {
return store;
+ }
+
+ public Packet toPacket(ByteSequence sequence) {
+ return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
+ }
+
+ public ByteSequence toByteSequence(Packet packet) {
+ org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
+ return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java Thu Aug 31 17:13:23 2006
@@ -21,11 +21,11 @@
import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.activeio.util.ByteArrayInputStream;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MarshallingTransportFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MarshallingTransportFilter.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MarshallingTransportFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MarshallingTransportFilter.java Thu Aug 31 17:13:23 2006
@@ -19,8 +19,8 @@
import java.io.IOException;
-import org.apache.activeio.command.WireFormat;
import org.apache.activemq.command.Command;
+import org.apache.activemq.wireformat.WireFormat;
public class MarshallingTransportFilter extends TransportFilter {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Thu Aug 31 17:13:23 2006
@@ -25,12 +25,12 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.command.WireFormatFactory;
-import org.apache.activeio.util.FactoryFinder;
+import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java Thu Aug 31 17:13:23 2006
@@ -26,8 +26,6 @@
import org.apache.activeio.Channel;
import org.apache.activeio.ChannelFactory;
import org.apache.activeio.adapter.SyncToAsyncChannel;
-import org.apache.activeio.command.AsyncChannelToAsyncCommandChannel;
-import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.async.AsyncChannel;
import org.apache.activeio.stream.sync.socket.SocketMetadata;
import org.apache.activemq.openwire.OpenWireFormat;
@@ -42,6 +40,7 @@
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportServer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportServer.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportServer.java Thu Aug 31 17:13:23 2006
@@ -25,14 +25,14 @@
import org.apache.activeio.AcceptListener;
import org.apache.activeio.Channel;
import org.apache.activeio.ChannelFactory;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.command.WireFormatFactory;
import org.apache.activeio.packet.async.AsyncChannelServer;
import org.apache.activemq.ThreadPriorities;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/AsyncChannelToAsyncCommandChannel.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/AsyncChannelToAsyncCommandChannel.java?rev=439111&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/AsyncChannelToAsyncCommandChannel.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/AsyncChannelToAsyncCommandChannel.java Thu Aug 31 17:13:23 2006
@@ -0,0 +1,97 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.activeio;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.activeio.command.AsyncCommandChannel;
+import org.apache.activeio.command.CommandListener;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.EOSPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelListener;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class AsyncChannelToAsyncCommandChannel implements AsyncCommandChannel {
+ private AsyncChannel channel;
+ private WireFormat wireFormat;
+
+ public AsyncChannelToAsyncCommandChannel(AsyncChannel channel, WireFormat wireFormat) {
+ this.channel = channel;
+ this.wireFormat = wireFormat;
+ }
+
+ public Packet toPacket(ByteSequence sequence) {
+ return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
+ }
+
+ public ByteSequence toByteSequence(Packet packet) {
+ org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
+ return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ }
+
+ public void writeCommand(Object command) throws IOException {
+ ByteSequence sequence = wireFormat.marshal(command);
+ channel.write(toPacket(sequence));
+ channel.flush();
+ }
+
+ public Object getAdapter(Class target) {
+ return channel.getAdapter(target);
+ }
+
+ public void dispose() {
+ channel.dispose();
+ }
+
+ public void start() throws IOException {
+ channel.start();
+ }
+
+ public void stop() throws IOException {
+ channel.stop();
+ }
+
+ public void setCommandListener(final CommandListener listener) {
+ channel.setAsyncChannelListener(new AsyncChannelListener() {
+ public void onPacket(Packet packet) {
+ if( packet == EOSPacket.EOS_PACKET ) {
+ listener.onError(new EOFException("Peer disconnected."));
+ return;
+ }
+ try {
+ Object command = wireFormat.unmarshal(toByteSequence(packet));
+ listener.onCommand(command);
+ }
+ catch (IOException e) {
+ listener.onError(e);
+ }
+ }
+
+ public void onPacketError(IOException error) {
+ listener.onError(error);
+ }
+ });
+ }
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryAgentFactory.java Thu Aug 31 17:13:23 2006
@@ -20,7 +20,7 @@
import java.io.IOException;
import java.net.URI;
-import org.apache.activeio.util.FactoryFinder;
+import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransportFactory.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransportFactory.java Thu Aug 31 17:13:23 2006
@@ -21,10 +21,10 @@
import java.net.URI;
import java.net.UnknownHostException;
-import org.apache.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.udp.UdpTransportFactory;
+import org.apache.activemq.wireformat.WireFormat;
/**
* A factory of multicast transport classes
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Thu Aug 31 17:13:23 2006
@@ -27,7 +27,6 @@
import javax.jms.Destination;
import javax.jms.JMSException;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@@ -49,6 +48,7 @@
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java Thu Aug 31 17:13:23 2006
@@ -19,9 +19,9 @@
import java.util.Map;
-import org.apache.activeio.command.WireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.wireformat.WireFormat;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Thu Aug 31 17:13:23 2006
@@ -24,12 +24,10 @@
import java.util.Iterator;
import java.util.Map;
-import org.apache.activeio.adapter.PacketInputStream;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.ByteSequence;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
/**
* Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
@@ -46,16 +44,16 @@
private int version=1;
- public Packet marshal(Object command) throws IOException {
+ public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
dos.close();
- return new ByteArrayPacket(baos.toByteSequence());
+ return baos.toByteSequence();
}
- public Object unmarshal(Packet packet) throws IOException {
- PacketInputStream stream = new PacketInputStream(packet);
+ public Object unmarshal(ByteSequence packet) throws IOException {
+ ByteArrayInputStream stream = new ByteArrayInputStream(packet);
DataInputStream dis = new DataInputStream(stream);
return unmarshal(dis);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java Thu Aug 31 17:13:23 2006
@@ -17,8 +17,8 @@
*/
package org.apache.activemq.transport.stomp;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.command.WireFormatFactory;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
/**
* Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Aug 31 17:13:23 2006
@@ -33,13 +33,13 @@
import javax.net.SocketFactory;
-import org.apache.activeio.command.WireFormat;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Thu Aug 31 17:13:23 2006
@@ -27,7 +27,6 @@
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
-import org.apache.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
@@ -38,6 +37,7 @@
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Thu Aug 31 17:13:23 2006
@@ -29,8 +29,6 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.command.WireFormatFactory;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport;
@@ -38,6 +36,8 @@
import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java Thu Aug 31 17:13:23 2006
@@ -24,8 +24,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
-import org.apache.activeio.util.ByteArrayInputStream;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
@@ -33,6 +31,8 @@
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.reliable.ReplayBuffer;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?rev=439111&r1=439110&r2=439111&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java Thu Aug 31 17:13:23 2006
@@ -24,8 +24,6 @@
import java.net.DatagramSocket;
import java.net.SocketAddress;
-import org.apache.activeio.util.ByteArrayInputStream;
-import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
@@ -33,6 +31,8 @@
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.reliable.ReplayBuffer;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;