You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/09/19 00:07:14 UTC
svn commit: r447591 [1/2] - in /geronimo/sandbox/gcache: ./ client/
openwire/ openwire/src/main/java/org/apache/geronimo/openwire/
openwire/src/main/java/org/apache/geronimo/openwire/command/
openwire/src/main/java/org/apache/geronimo/openwire/thread/ ...
Author: jgenender
Date: Mon Sep 18 15:07:10 2006
New Revision: 447591
URL: http://svn.apache.org/viewvc?view=rev&rev=447591
Log:
GERONIMO-2412 - TestNG tests
Added:
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/BooleanStream.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/CommandIdComparator.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/DataStreamMarshaller.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormat.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormatFactory.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransport.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransportFactory.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/package.html (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransport.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransportFactory.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransportServer.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/package.html (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/util/ByteSequenceData.java (with props)
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/util/IdGenerator.java (with props)
geronimo/sandbox/gcache/openwire/src/main/resources/
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/DISCLAIMER.txt (with props)
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/LICENSE.txt (with props)
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/discovery
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/multicast
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/simple
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/static
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/failover
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/mock
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/ssl
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/tcp
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/transport/vm
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/default
geronimo/sandbox/gcache/openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
geronimo/sandbox/gcache/openwire/src/test/java/
geronimo/sandbox/gcache/openwire/src/test/java/org/
geronimo/sandbox/gcache/openwire/src/test/java/org/apache/
geronimo/sandbox/gcache/openwire/src/test/java/org/apache/geronimo/
geronimo/sandbox/gcache/openwire/src/test/java/org/apache/geronimo/openwire/
geronimo/sandbox/gcache/openwire/src/test/java/org/apache/geronimo/openwire/NumberRangesWhileMarshallingTest.java (with props)
geronimo/sandbox/gcache/openwire/src/test/java/org/apache/geronimo/openwire/transport/
geronimo/sandbox/gcache/openwire/src/test/java/org/apache/geronimo/openwire/transport/tcp/
geronimo/sandbox/gcache/openwire/src/test/java/org/apache/geronimo/openwire/transport/tcp/TcpTransportFactoryTest.java (with props)
geronimo/sandbox/gcache/openwire/src/test/java/org/apache/geronimo/openwire/util/
geronimo/sandbox/gcache/openwire/src/test/java/org/apache/geronimo/openwire/util/URISupportTest.java (with props)
Modified:
geronimo/sandbox/gcache/client/pom.xml
geronimo/sandbox/gcache/openwire/pom.xml
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/DataStructure.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Valve.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/ResponseCorrelator.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportFactory.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServer.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServerThreadSupport.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/WireFormatNegotiator.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpBufferedOutputStream.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpTransportFactory.java
geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/wireformat/ObjectStreamWireFormat.java
geronimo/sandbox/gcache/pom.xml
geronimo/sandbox/gcache/server/pom.xml
Modified: geronimo/sandbox/gcache/client/pom.xml
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/client/pom.xml?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/client/pom.xml (original)
+++ geronimo/sandbox/gcache/client/pom.xml Mon Sep 18 15:07:10 2006
@@ -17,8 +17,10 @@
<dependencies>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <classifier>jdk15</classifier>
+ <scope>test</scope>
</dependency>
<dependency>
Modified: geronimo/sandbox/gcache/openwire/pom.xml
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/pom.xml?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/pom.xml (original)
+++ geronimo/sandbox/gcache/openwire/pom.xml Mon Sep 18 15:07:10 2006
@@ -27,8 +27,10 @@
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <classifier>jdk15</classifier>
+ <scope>test</scope>
</dependency>
</dependencies>
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/BooleanStream.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/BooleanStream.java?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/BooleanStream.java (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/BooleanStream.java Mon Sep 18 15:07:10 2006
@@ -0,0 +1,125 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.openwire;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+final public class BooleanStream {
+
+ byte data[] = new byte[48];
+ short arrayLimit;
+ short arrayPos;
+ byte bytePos;
+
+ public boolean readBoolean() throws IOException {
+ assert arrayPos <= arrayLimit;
+ byte b = data[arrayPos];
+ boolean rc = ((b>>bytePos)&0x01)!=0;
+ bytePos++;
+ if( bytePos >= 8 ) {
+ bytePos=0;
+ arrayPos++;
+ }
+ return rc;
+ }
+
+ public void writeBoolean(boolean value) throws IOException {
+ if( bytePos == 0 ) {
+ arrayLimit++;
+ if( arrayLimit >= data.length ) {
+ // re-grow the array.
+ byte d[] = new byte[data.length*2];
+ System.arraycopy(data, 0, d, 0, data.length);
+ data = d;
+ }
+ }
+ if( value ) {
+ data[arrayPos] |= (0x01 << bytePos);
+ }
+ bytePos++;
+ if( bytePos >= 8 ) {
+ bytePos=0;
+ arrayPos++;
+ }
+ }
+
+ public void marshal(DataOutputStream dataOut) throws IOException {
+ if( arrayLimit < 64 ) {
+ dataOut.writeByte(arrayLimit);
+ } else if( arrayLimit < 256 ) { // max value of unsigned byte
+ dataOut.writeByte(0xC0);
+ dataOut.writeByte(arrayLimit);
+ } else {
+ dataOut.writeByte(0x80);
+ dataOut.writeShort(arrayLimit);
+ }
+
+ dataOut.write(data, 0, arrayLimit);
+ clear();
+ }
+
+ public void marshal(ByteBuffer dataOut) {
+ if( arrayLimit < 64 ) {
+ dataOut.put((byte) arrayLimit);
+ } else if( arrayLimit < 256 ) { // max value of unsigned byte
+ dataOut.put((byte) 0xC0);
+ dataOut.put((byte) arrayLimit);
+ } else {
+ dataOut.put((byte) 0x80);
+ dataOut.putShort(arrayLimit);
+ }
+
+ dataOut.put(data, 0, arrayLimit);
+ }
+
+
+ public void unmarshal(DataInputStream dataIn) throws IOException {
+
+ arrayLimit = (short) (dataIn.readByte() & 0xFF);
+ if ( arrayLimit == 0xC0 ) {
+ arrayLimit = (short)(dataIn.readByte() & 0xFF);
+ } else if( arrayLimit == 0x80 ) {
+ arrayLimit = dataIn.readShort();
+ }
+ if( data.length < arrayLimit ) {
+ data = new byte[arrayLimit];
+ }
+ dataIn.readFully(data, 0, arrayLimit);
+ clear();
+ }
+
+ public void clear() {
+ arrayPos=0;
+ bytePos=0;
+ }
+
+ public int marshalledSize() {
+ if( arrayLimit < 64 ) {
+ return 1+arrayLimit;
+ } else if (arrayLimit < 256) {
+ return 2+arrayLimit;
+ } else {
+ return 3+arrayLimit;
+ }
+ }
+
+
+}
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/BooleanStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/BooleanStream.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/BooleanStream.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/CommandIdComparator.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/CommandIdComparator.java?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/CommandIdComparator.java (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/CommandIdComparator.java Mon Sep 18 15:07:10 2006
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.openwire;
+
+import java.util.Comparator;
+
+import org.apache.geronimo.openwire.command.Command;
+
+/**
+ * A @{link Comparator} of commands using their {@link Command#getCommandId()}
+ *
+ * @version $Revision$
+ */
+public class CommandIdComparator implements Comparator {
+
+ public int compare(Object o1, Object o2) {
+ assert o1 instanceof Command;
+ assert o2 instanceof Command;
+
+ Command c1 = (Command) o1;
+ Command c2 = (Command) o2;
+ return c1.getCommandId() - c2.getCommandId();
+ }
+
+}
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/CommandIdComparator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/CommandIdComparator.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/CommandIdComparator.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/DataStreamMarshaller.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/DataStreamMarshaller.java?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/DataStreamMarshaller.java (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/DataStreamMarshaller.java Mon Sep 18 15:07:10 2006
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.openwire;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.geronimo.openwire.command.DataStructure;
+
+public interface DataStreamMarshaller {
+
+ byte getDataStructureType();
+ DataStructure createObject();
+
+ int tightMarshal1(OpenWireFormat format, Object c, BooleanStream bs) throws IOException;
+ void tightMarshal2(OpenWireFormat format, Object c, DataOutputStream ds, BooleanStream bs) throws IOException;
+ void tightUnmarshal(OpenWireFormat format, Object data, DataInputStream dis, BooleanStream bs) throws IOException;
+
+ void looseMarshal(OpenWireFormat format, Object c, DataOutputStream ds) throws IOException;
+ void looseUnmarshal(OpenWireFormat format, Object data, DataInputStream dis) throws IOException;
+
+}
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/DataStreamMarshaller.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/DataStreamMarshaller.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/DataStreamMarshaller.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormat.java?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormat.java (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormat.java Mon Sep 18 15:07:10 2006
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.openwire;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+
+import org.apache.geronimo.openwire.command.CommandTypes;
+import org.apache.geronimo.openwire.command.DataStructure;
+import org.apache.geronimo.openwire.command.MarshallAware;
+import org.apache.geronimo.openwire.command.WireFormatInfo;
+import org.apache.geronimo.openwire.util.ByteArrayInputStream;
+import org.apache.geronimo.openwire.util.ByteArrayOutputStream;
+import org.apache.geronimo.openwire.util.ByteSequence;
+import org.apache.geronimo.openwire.util.ByteSequenceData;
+import org.apache.geronimo.openwire.util.ClassLoading;
+import org.apache.geronimo.openwire.util.IdGenerator;
+import org.apache.geronimo.openwire.wireformat.WireFormat;
+
+/**
+ *
+ * @version $Revision$
+ */
+final public class OpenWireFormat implements WireFormat {
+
+ static final byte NULL_TYPE = CommandTypes.NULL;
+ private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE/2;
+ private static final int MARSHAL_CACHE_PREFERED_SIZE = MARSHAL_CACHE_SIZE-100;
+
+ private DataStreamMarshaller dataMarshallers[];
+ private int version;
+ private boolean stackTraceEnabled=false;
+ private boolean tcpNoDelayEnabled=false;
+ private boolean cacheEnabled=false;
+ private boolean tightEncodingEnabled=false;
+ private boolean sizePrefixDisabled=false;
+
+ private HashMap marshallCacheMap = new HashMap();
+ private short nextMarshallCacheIndex=0;
+ private short nextMarshallCacheEvictionIndex=0;
+
+ private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
+ private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
+ private WireFormatInfo preferedWireFormatInfo;
+
+ public OpenWireFormat() {
+ this(1);
+ }
+
+ public OpenWireFormat(int i) {
+ setVersion(i);
+ }
+
+ public int hashCode() {
+ return version
+ ^ (cacheEnabled ? 0x10000000:0x20000000)
+ ^ (stackTraceEnabled ? 0x01000000:0x02000000)
+ ^ (tightEncodingEnabled ? 0x00100000:0x00200000)
+ ^ (sizePrefixDisabled ? 0x00010000:0x00020000)
+ ;
+ }
+
+ public OpenWireFormat copy() {
+ OpenWireFormat answer = new OpenWireFormat();
+ answer.version = version;
+ answer.stackTraceEnabled = stackTraceEnabled;
+ answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
+ answer.cacheEnabled = cacheEnabled;
+ answer.tightEncodingEnabled = tightEncodingEnabled;
+ answer.sizePrefixDisabled = sizePrefixDisabled;
+ answer.preferedWireFormatInfo = preferedWireFormatInfo;
+ return answer;
+ }
+
+ public boolean equals(Object object) {
+ if( object == null )
+ return false;
+ OpenWireFormat o = (OpenWireFormat) object;
+ return o.stackTraceEnabled == stackTraceEnabled &&
+ o.cacheEnabled == cacheEnabled &&
+ o.version == version &&
+ o.tightEncodingEnabled == tightEncodingEnabled &&
+ o.sizePrefixDisabled == sizePrefixDisabled
+ ;
+ }
+
+ static IdGenerator g = new IdGenerator();
+ String id = g.generateId();
+ public String toString() {
+ return "OpenWireFormat{version="+version+", cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+", tightEncodingEnabled="+tightEncodingEnabled+", sizePrefixDisabled="+sizePrefixDisabled+"}";
+ //return "OpenWireFormat{id="+id+", tightEncodingEnabled="+tightEncodingEnabled+"}";
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public ByteSequence marshal(Object command) throws IOException {
+
+ if( cacheEnabled ) {
+ runMarshallCacheEvictionSweep();
+ }
+
+ MarshallAware ma=null;
+ // If not using value caching, then the marshaled form is always the same
+ if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
+ ma = (MarshallAware) command;
+ }
+
+ ByteSequence sequence=null;
+ if( ma!=null ) {
+ sequence = ma.getCachedMarshalledForm(this);
+ }
+
+ if( sequence == null ) {
+
+ int size=1;
+ if( command != null) {
+
+ DataStructure c = (DataStructure) command;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+type);
+
+ if( tightEncodingEnabled ) {
+
+ BooleanStream bs = new BooleanStream();
+ size += dsm.tightMarshal1(this, c, bs);
+ size += bs.marshalledSize();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
+ DataOutputStream ds = new DataOutputStream(baos);
+ if( !sizePrefixDisabled ) {
+ ds.writeInt(size);
+ }
+ ds.writeByte(type);
+ bs.marshal(ds);
+ dsm.tightMarshal2(this, c, ds, bs);
+ ds.close();
+ sequence = baos.toByteSequence();
+
+ } else {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream ds = new DataOutputStream(baos);
+ if( !sizePrefixDisabled ) {
+ ds.writeInt(0); // we don't know the final size yet but write this here for now.
+ }
+ ds.writeByte(type);
+ dsm.looseMarshal(this, c, ds);
+ ds.close();
+ sequence = baos.toByteSequence();
+
+ if( !sizePrefixDisabled ) {
+ size = sequence.getLength()-4;
+ int pos = sequence.offset;
+ ByteSequenceData.writeIntBig(sequence, size);
+ sequence.offset = pos;
+ }
+ }
+
+
+ } else {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(5);
+ DataOutputStream daos = new DataOutputStream(baos);
+ daos.writeInt(size);
+ daos.writeByte(NULL_TYPE);
+ daos.close();
+ sequence = baos.toByteSequence();
+ }
+
+ if( ma!=null ) {
+ ma.setCachedMarshalledForm(this, sequence);
+ }
+ }
+ return sequence;
+ }
+
+ public Object unmarshal(ByteSequence sequence) throws IOException {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(sequence));
+
+ if( !sizePrefixDisabled ) {
+ int size = dis.readInt();
+ if( sequence.getLength()-4 != size ) {
+ // throw new IOException("Packet size does not match marshaled size");
+ }
+ }
+
+ Object command = doUnmarshal(dis);
+ if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
+ ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
+ }
+ return command;
+ }
+
+ public void marshal(Object o, DataOutputStream dataOut) throws IOException {
+
+ if( cacheEnabled ) {
+ runMarshallCacheEvictionSweep();
+ }
+
+ int size=1;
+ if( o != null) {
+
+ DataStructure c = (DataStructure) o;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+type);
+
+ if( tightEncodingEnabled ) {
+ BooleanStream bs = new BooleanStream();
+ size += dsm.tightMarshal1(this, c, bs);
+ size += bs.marshalledSize();
+
+ if( !sizePrefixDisabled ) {
+ dataOut.writeInt(size);
+ }
+
+ dataOut.writeByte(type);
+ bs.marshal(dataOut);
+ dsm.tightMarshal2(this, c, dataOut, bs);
+
+ } else {
+ DataOutputStream looseOut = dataOut;
+ ByteArrayOutputStream baos=null;
+
+ if( !sizePrefixDisabled ) {
+ baos = new ByteArrayOutputStream();
+ looseOut = new DataOutputStream(baos);
+ }
+
+ looseOut.writeByte(type);
+ dsm.looseMarshal(this, c, looseOut);
+
+ if( !sizePrefixDisabled ) {
+ looseOut.close();
+ ByteSequence sequence = baos.toByteSequence();
+ dataOut.writeInt(sequence.getLength());
+ dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ }
+
+ }
+
+ } else {
+ dataOut.writeInt(size);
+ dataOut.writeByte(NULL_TYPE);
+ }
+ }
+
+ public Object unmarshal(DataInputStream dis) throws IOException {
+ if( !sizePrefixDisabled ) {
+ dis.readInt();
+ }
+ return doUnmarshal(dis);
+ }
+
+ /**
+ * Used by NIO or AIO transports
+ */
+ public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
+ int size=1;
+ if( o != null) {
+ DataStructure c = (DataStructure) o;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+type);
+
+ size += dsm.tightMarshal1(this, c, bs);
+ size += bs.marshalledSize();
+ }
+ return size;
+ }
+
+ /**
+ * Used by NIO or AIO transports; note that the size is not written as part of this method.
+ */
+ public void tightMarshal2(Object o, DataOutputStream ds, BooleanStream bs) throws IOException {
+ if( cacheEnabled ) {
+ runMarshallCacheEvictionSweep();
+ }
+
+ if( o != null) {
+ DataStructure c = (DataStructure) o;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+type);
+
+ ds.writeByte(type);
+ bs.marshal(ds);
+ dsm.tightMarshal2(this, c, ds, bs);
+ }
+ }
+
+
+ /**
+ * Allows you to dynamically switch the version of the openwire protocol being used.
+ * @param version
+ */
+ public void setVersion(int version) {
+ String mfName = "org.apache.activemq.openwire.v"+version+".MarshallerFactory";
+ Class mfClass;
+ try {
+ mfClass = ClassLoading.loadClass(mfName, getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: "+version+", could not load "+mfName).initCause(e);
+ }
+ try {
+ Method method = mfClass.getMethod("createMarshallerMap", new Class[]{OpenWireFormat.class});
+ dataMarshallers = (DataStreamMarshaller[]) method.invoke(null, new Object[]{this});
+ } catch (Throwable e) {
+ throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: "+version+", "+mfName+" does not properly implement the createMarshallerMap method.").initCause(e);
+ }
+ this.version = version;
+ }
+
+ public Object doUnmarshal(DataInputStream dis) throws IOException {
+ byte dataType = dis.readByte();
+ if( dataType!=NULL_TYPE ) {
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+dataType);
+ Object data = dsm.createObject();
+ if( this.tightEncodingEnabled ) {
+ BooleanStream bs = new BooleanStream();
+ bs.unmarshal(dis);
+ dsm.tightUnmarshal(this, data, dis, bs);
+ } else {
+ dsm.looseUnmarshal(this, data, dis);
+ }
+ return data;
+ } else {
+ return null;
+ }
+ }
+
+// public void debug(String msg) {
+// String t = (Thread.currentThread().getName()+" ").substring(0, 40);
+// System.out.println(t+": "+msg);
+// }
+ public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
+ bs.writeBoolean(o != null);
+ if( o == null )
+ return 0;
+
+ if( o.isMarshallAware() ) {
+ MarshallAware ma = (MarshallAware) o;
+ ByteSequence sequence=ma.getCachedMarshalledForm(this);
+ bs.writeBoolean(sequence!=null);
+ if( sequence!=null ) {
+ return 1 + sequence.getLength();
+ }
+ }
+
+ byte type = o.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+type);
+ return 1 + dsm.tightMarshal1(this, o, bs);
+ }
+
+ public void tightMarshalNestedObject2(DataStructure o, DataOutputStream ds, BooleanStream bs) throws IOException {
+ if( !bs.readBoolean() )
+ return;
+
+ byte type = o.getDataStructureType();
+ ds.writeByte(type);
+
+ if( o.isMarshallAware() && bs.readBoolean() ) {
+
+ MarshallAware ma = (MarshallAware) o;
+ ByteSequence sequence=ma.getCachedMarshalledForm(this);
+ ds.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+
+ } else {
+
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+type);
+ dsm.tightMarshal2(this, o, ds, bs);
+
+ }
+ }
+
+ public DataStructure tightUnmarshalNestedObject(DataInputStream dis, BooleanStream bs) throws IOException {
+ if( bs.readBoolean() ) {
+
+ byte dataType = dis.readByte();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+dataType);
+ DataStructure data = dsm.createObject();
+
+ if( data.isMarshallAware() && bs.readBoolean() ) {
+
+ dis.readInt();
+ dis.readByte();
+
+ BooleanStream bs2 = new BooleanStream();
+ bs2.unmarshal(dis);
+ dsm.tightUnmarshal(this, data, dis, bs2);
+
+ // TODO: extract the sequence from the dis and associate it.
+// MarshallAware ma = (MarshallAware)data
+// ma.setCachedMarshalledForm(this, sequence);
+
+ } else {
+ dsm.tightUnmarshal(this, data, dis, bs);
+ }
+
+ return data;
+ } else {
+ return null;
+ }
+ }
+
+ public DataStructure looseUnmarshalNestedObject(DataInputStream dis) throws IOException {
+ if( dis.readBoolean() ) {
+
+ byte dataType = dis.readByte();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+dataType);
+ DataStructure data = dsm.createObject();
+ dsm.looseUnmarshal(this, data, dis);
+ return data;
+
+ } else {
+ return null;
+ }
+ }
+
+ public void looseMarshalNestedObject(DataStructure o, DataOutputStream dataOut) throws IOException {
+ dataOut.writeBoolean(o!=null);
+ if( o!=null ) {
+ byte type = o.getDataStructureType();
+ dataOut.writeByte(type);
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if( dsm == null )
+ throw new IOException("Unknown data type: "+type);
+ dsm.looseMarshal(this, o, dataOut);
+ }
+ }
+
+ public void runMarshallCacheEvictionSweep() {
+ // Do we need to start evicting??
+ while( marshallCacheMap.size() > MARSHAL_CACHE_PREFERED_SIZE ) {
+
+ marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
+ marshallCache[nextMarshallCacheEvictionIndex]=null;
+
+ nextMarshallCacheEvictionIndex++;
+ if( nextMarshallCacheEvictionIndex >= MARSHAL_CACHE_SIZE ) {
+ nextMarshallCacheEvictionIndex=0;
+ }
+
+ }
+ }
+
+ public Short getMarshallCacheIndex(DataStructure o) {
+ return (Short) marshallCacheMap.get(o);
+ }
+
+ public Short addToMarshallCache(DataStructure o) {
+ short i = nextMarshallCacheIndex++;
+ if( nextMarshallCacheIndex >= MARSHAL_CACHE_SIZE ) {
+ nextMarshallCacheIndex=0;
+ }
+
+ // We can only cache that item if there is space left.
+ if( marshallCacheMap.size() < MARSHAL_CACHE_SIZE ) {
+ marshallCache[i] = o;
+ Short index = new Short(i);
+ marshallCacheMap.put(o, index);
+ return index;
+ } else {
+ // Use -1 to indicate that the value was not cached due to cache being full.
+ return new Short((short)-1);
+ }
+ }
+
+ public void setInUnmarshallCache(short index, DataStructure o) {
+
+ // There was no space left in the cache, so we can't
+ // put this in the cache.
+ if( index == -1 )
+ return;
+
+ unmarshallCache[index]=o;
+ }
+
+ public DataStructure getFromUnmarshallCache(short index) {
+ return unmarshallCache[index];
+ }
+
+
+ public void setStackTraceEnabled(boolean b) {
+ stackTraceEnabled = b;
+ }
+ public boolean isStackTraceEnabled() {
+ return stackTraceEnabled;
+ }
+
+ public boolean isTcpNoDelayEnabled() {
+ return tcpNoDelayEnabled;
+ }
+ public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
+ this.tcpNoDelayEnabled = tcpNoDelayEnabled;
+ }
+
+ public boolean isCacheEnabled() {
+ return cacheEnabled;
+ }
+ public void setCacheEnabled(boolean cacheEnabled) {
+ this.cacheEnabled = cacheEnabled;
+ }
+
+ public boolean isTightEncodingEnabled() {
+ return tightEncodingEnabled;
+ }
+
+ public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
+ this.tightEncodingEnabled = tightEncodingEnabled;
+ }
+
+ public boolean isSizePrefixDisabled() {
+ return sizePrefixDisabled;
+ }
+
+ public void setSizePrefixDisabled(boolean prefixPacketSize) {
+ this.sizePrefixDisabled = prefixPacketSize;
+ }
+
+ public void setPreferedWireFormatInfo(WireFormatInfo info) {
+ this.preferedWireFormatInfo = info;
+ }
+ public WireFormatInfo getPreferedWireFormatInfo() {
+ return preferedWireFormatInfo;
+ }
+
+ public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
+
+ if( preferedWireFormatInfo==null )
+ throw new IllegalStateException("Wireformat cannot not be renegotiated.");
+
+ this.setVersion(Math.min(preferedWireFormatInfo.getVersion(), info.getVersion()) );
+ this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
+ this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
+ this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
+ this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled();
+ this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled();
+
+ }
+}
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormat.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormat.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormatFactory.java?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormatFactory.java (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormatFactory.java Mon Sep 18 15:07:10 2006
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.openwire;
+
+import org.apache.geronimo.openwire.command.WireFormatInfo;
+import org.apache.geronimo.openwire.wireformat.WireFormat;
+import org.apache.geronimo.openwire.wireformat.WireFormatFactory;
+
+/**
+ * @version $Revision$
+ */
+public class OpenWireFormatFactory implements WireFormatFactory {
+
+ //
+ // The default values here are what the wire format changes to after a default negotiation.
+ //
+
+ private int version=2;
+ private boolean stackTraceEnabled=true;
+ private boolean tcpNoDelayEnabled=true;
+ private boolean cacheEnabled=true;
+ private boolean tightEncodingEnabled=true;
+ private boolean sizePrefixDisabled=false;
+ private long maxInactivityDuration=30*1000;
+
+ public WireFormat createWireFormat() {
+ WireFormatInfo info = new WireFormatInfo();
+ info.setVersion(version);
+
+ try {
+ info.setStackTraceEnabled(stackTraceEnabled);
+ info.setCacheEnabled(cacheEnabled);
+ info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
+ info.setTightEncodingEnabled(tightEncodingEnabled);
+ info.setSizePrefixDisabled(sizePrefixDisabled);
+ info.seMaxInactivityDuration(maxInactivityDuration);
+ } catch (Exception e) {
+ IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
+ ise.initCause(e);
+ throw ise;
+ }
+
+ OpenWireFormat f = new OpenWireFormat(version);
+ f.setPreferedWireFormatInfo(info);
+ return f;
+ }
+
+ public boolean isStackTraceEnabled() {
+ return stackTraceEnabled;
+ }
+
+ public void setStackTraceEnabled(boolean stackTraceEnabled) {
+ this.stackTraceEnabled = stackTraceEnabled;
+ }
+
+ public boolean isTcpNoDelayEnabled() {
+ return tcpNoDelayEnabled;
+ }
+
+ public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
+ this.tcpNoDelayEnabled = tcpNoDelayEnabled;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public boolean isCacheEnabled() {
+ return cacheEnabled;
+ }
+
+ public void setCacheEnabled(boolean cacheEnabled) {
+ this.cacheEnabled = cacheEnabled;
+ }
+
+ public boolean isTightEncodingEnabled() {
+ return tightEncodingEnabled;
+ }
+
+ public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
+ this.tightEncodingEnabled = tightEncodingEnabled;
+ }
+
+ public boolean isSizePrefixDisabled() {
+ return sizePrefixDisabled;
+ }
+
+ public void setSizePrefixDisabled(boolean sizePrefixDisabled) {
+ this.sizePrefixDisabled = sizePrefixDisabled;
+ }
+
+ public long getMaxInactivityDuration() {
+ return maxInactivityDuration;
+ }
+
+ public void setMaxInactivityDuration(long maxInactivityDuration) {
+ this.maxInactivityDuration = maxInactivityDuration;
+ }
+}
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormatFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormatFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/OpenWireFormatFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/DataStructure.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/DataStructure.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/DataStructure.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/DataStructure.java Mon Sep 18 15:07:10 2006
@@ -1,5 +1,7 @@
package org.apache.geronimo.openwire.command;
+import java.io.Serializable;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,7 +20,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-public interface DataStructure {
+public interface DataStructure extends Serializable {
/**
* @return The type of the data structure
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java Mon Sep 18 15:07:10 2006
@@ -23,6 +23,7 @@
* @version $Revision$
*/
public class SessionId implements DataStructure {
+ private static final long serialVersionUID = 1L;
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.SESSION_ID;
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Valve.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Valve.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Valve.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Valve.java Mon Sep 18 15:07:10 2006
@@ -20,7 +20,6 @@
/**
* A Valve is a synchronization object used enable or disable the "flow" of concurrent
* processing.
- *
*
* @version $Revision$
*/
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/ResponseCorrelator.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/ResponseCorrelator.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/ResponseCorrelator.java Mon Sep 18 15:07:10 2006
@@ -33,7 +33,7 @@
/**
* Adds the incrementing sequence number to commands along with performing the corelation of
- * responses to requests to create a blocking request-response semantics.
+ * responses to requests to create a blocking request-response semantic.
*
* @version $Revision$
*/
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportFactory.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportFactory.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportFactory.java Mon Sep 18 15:07:10 2006
@@ -37,7 +37,7 @@
public abstract class TransportFactory {
- public abstract TransportServer doBind(String brokerId, URI location) throws IOException;
+ public abstract TransportServer doBind(String nodeId, URI location) throws IOException;
public Transport doConnect(URI location, Executor ex) throws Exception {
return doConnect(location);
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServer.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServer.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServer.java Mon Sep 18 15:07:10 2006
@@ -40,10 +40,10 @@
public void setAcceptListener(TransportAcceptListener acceptListener);
/**
- * Associates a broker info with the transport server so that the transport can do
- * discovery advertisements of the broker.
+ * Associates a node info with the transport server so that the transport can do
+ * discovery advertisements of the node.
*
- * @param brokerInfo
+ * @param nodeInfo
*/
public void setNodeInfo(NodeInfo nodeInfo);
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServerThreadSupport.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServerThreadSupport.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/TransportServerThreadSupport.java Mon Sep 18 15:07:10 2006
@@ -70,7 +70,7 @@
protected void doStart() throws Exception {
log.info("Listening for connections at: " + getConnectURI());
- runner = new Thread(this, "ActiveMQ Transport Server: "+toString());
+ runner = new Thread(this, "OpenWire Transport Server: "+toString());
runner.setDaemon(daemon);
runner.setPriority(ThreadPriorities.MANAGEMENT);
runner.start();
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/WireFormatNegotiator.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/WireFormatNegotiator.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/WireFormatNegotiator.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/WireFormatNegotiator.java Mon Sep 18 15:07:10 2006
@@ -79,7 +79,7 @@
public void oneway(Command command) throws IOException {
try {
if( !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS) )
- throw new IOException("Wire format negociation timeout: peer did not send his wire format.");
+ throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransport.java?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransport.java (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransport.java Mon Sep 18 15:07:10 2006
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.openwire.transport.mock;
+
+import java.io.IOException;
+
+import org.apache.geronimo.openwire.command.Command;
+import org.apache.geronimo.openwire.command.Response;
+import org.apache.geronimo.openwire.transport.DefaultTransportListener;
+import org.apache.geronimo.openwire.transport.FutureResponse;
+import org.apache.geronimo.openwire.transport.ResponseCallback;
+import org.apache.geronimo.openwire.transport.Transport;
+import org.apache.geronimo.openwire.transport.TransportFilter;
+import org.apache.geronimo.openwire.transport.TransportListener;
+
+
+/**
+ * @version $Revision$
+ */
+public class MockTransport extends DefaultTransportListener implements Transport {
+
+ protected Transport next;
+ protected TransportListener transportListener;
+
+ public MockTransport(Transport next) {
+ this.next = next;
+ }
+
+ /**
+ */
+ synchronized public void setTransportListener(TransportListener channelListener) {
+ this.transportListener = channelListener;
+ if (channelListener == null)
+ next.setTransportListener(null);
+ else
+ next.setTransportListener(this);
+ }
+
+
+ /**
+ * @see org.apache.activemq.Service#start()
+ * @throws IOException if the next channel has not been set.
+ */
+ public void start() throws Exception {
+ if( next == null )
+ throw new IOException("The next channel has not been set.");
+ if( transportListener == null )
+ throw new IOException("The command listener has not been set.");
+ next.start();
+ }
+
+ /**
+ * @see org.apache.activemq.Service#stop()
+ */
+ public void stop() throws Exception {
+ next.stop();
+ }
+
+ synchronized public void onCommand(Command command) {
+ transportListener.onCommand(command);
+ }
+
+ /**
+ * @return Returns the next.
+ */
+ synchronized public Transport getNext() {
+ return next;
+ }
+
+ /**
+ * @return Returns the packetListener.
+ */
+ synchronized public TransportListener getTransportListener() {
+ return transportListener;
+ }
+
+ synchronized public String toString() {
+ return next.toString();
+ }
+
+ synchronized public void oneway(Command command) throws IOException {
+ next.oneway(command);
+ }
+
+ synchronized public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
+ return next.asyncRequest(command, null);
+ }
+
+ synchronized public Response request(Command command) throws IOException {
+ return next.request(command);
+ }
+
+ public Response request(Command command,int timeout) throws IOException {
+ return next.request(command, timeout);
+ }
+
+ synchronized public void onException(IOException error) {
+ transportListener.onException(error);
+ }
+
+ synchronized public Object narrow(Class target) {
+ if( target.isAssignableFrom(getClass()) ) {
+ return this;
+ }
+ return next.narrow(target);
+ }
+
+ synchronized public void setNext(Transport next) {
+ this.next = next;
+ }
+
+ synchronized public void install(TransportFilter filter) {
+ filter.setTransportListener(this);
+ getNext().setTransportListener(filter);
+ setNext(filter);
+ }
+
+ public String getRemoteAddress() {
+ return next.getRemoteAddress();
+ }
+
+}
\ No newline at end of file
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransport.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransport.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransportFactory.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransportFactory.java?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransportFactory.java (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransportFactory.java Mon Sep 18 15:07:10 2006
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.openwire.transport.mock;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.geronimo.openwire.transport.MutexTransport;
+import org.apache.geronimo.openwire.transport.ResponseCorrelator;
+import org.apache.geronimo.openwire.transport.Transport;
+import org.apache.geronimo.openwire.transport.TransportFactory;
+import org.apache.geronimo.openwire.transport.TransportServer;
+import org.apache.geronimo.openwire.util.IntrospectionSupport;
+import org.apache.geronimo.openwire.util.URISupport;
+import org.apache.geronimo.openwire.util.URISupport.CompositeData;
+
+public class MockTransportFactory extends TransportFactory {
+
+ public Transport doConnect(URI location) throws URISyntaxException, Exception {
+ Transport transport = createTransport(URISupport.parseComposite(location));
+ transport = new MutexTransport(transport);
+ transport = new ResponseCorrelator(transport);
+ return transport;
+ }
+
+ public Transport doCompositeConnect(URI location) throws URISyntaxException, Exception {
+ return createTransport(URISupport.parseComposite(location));
+ }
+
+ /**
+ * @param location
+ * @return
+ * @throws Exception
+ */
+ public Transport createTransport(CompositeData compositData) throws Exception {
+ MockTransport transport = new MockTransport( TransportFactory.compositeConnect(compositData.getComponents()[0]) );
+ IntrospectionSupport.setProperties(transport, compositData.getParameters());
+ return transport;
+ }
+
+ public TransportServer doBind(String brokerId,URI location) throws IOException {
+ throw new IOException("This protocol does not support being bound.");
+ }
+
+}
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransportFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransportFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/MockTransportFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/package.html
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/package.html?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/package.html (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/package.html Mon Sep 18 15:07:10 2006
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+ A mock implementation of the Transport layer useful for testing
+
+</body>
+</html>
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/package.html
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/mock/package.html
------------------------------------------------------------------------------
svn:mime-type = text/html
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpBufferedOutputStream.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpBufferedOutputStream.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpBufferedOutputStream.java Mon Sep 18 15:07:10 2006
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpTransportFactory.java?view=diff&rev=447591&r1=447590&r2=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpTransportFactory.java (original)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/tcp/TcpTransportFactory.java Mon Sep 18 15:07:10 2006
@@ -37,7 +37,7 @@
public class TcpTransportFactory extends TransportFactory {
private static final Log log = LogFactory.getLog(TcpTransportFactory.class);
- public TransportServer doBind(String brokerId, final URI location) throws IOException {
+ public TransportServer doBind(String nodeId, final URI location) throws IOException {
try {
Map options = new HashMap(URISupport.parseParamters(location));
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransport.java?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransport.java (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransport.java Mon Sep 18 15:07:10 2006
@@ -0,0 +1,256 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.geronimo.openwire.transport.vm;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.openwire.command.Command;
+import org.apache.geronimo.openwire.command.Response;
+import org.apache.geronimo.openwire.thread.Task;
+import org.apache.geronimo.openwire.thread.TaskRunner;
+import org.apache.geronimo.openwire.thread.TaskRunnerFactory;
+import org.apache.geronimo.openwire.transport.FutureResponse;
+import org.apache.geronimo.openwire.transport.ResponseCallback;
+import org.apache.geronimo.openwire.transport.Transport;
+import org.apache.geronimo.openwire.transport.TransportDisposedIOException;
+import org.apache.geronimo.openwire.transport.TransportListener;
+
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
+/**
+ * A Transport implementation that uses direct method invocations.
+ *
+ * @version $Revision$
+ */
+public class VMTransport implements Transport,Task{
+ private static final Log log=LogFactory.getLog(VMTransport.class);
+ private static final AtomicLong nextId=new AtomicLong(0);
+ private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY,
+ true,1000);
+ protected VMTransport peer;
+ protected TransportListener transportListener;
+ protected boolean disposed;
+ protected boolean marshal;
+ protected boolean network;
+ protected boolean async=false;
+ protected boolean started=false;
+ protected int asyncQueueDepth=2000;
+ protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
+ protected LinkedBlockingQueue messageQueue;
+ protected final URI location;
+ protected final long id;
+ private TaskRunner taskRunner;
+
+ public VMTransport(URI location){
+ this.location=location;
+ this.id=nextId.getAndIncrement();
+ }
+
+ synchronized public VMTransport getPeer(){
+ return peer;
+ }
+
+ synchronized public void setPeer(VMTransport peer){
+ this.peer=peer;
+ }
+
+ public void oneway(Command command) throws IOException{
+ if(disposed){
+ throw new TransportDisposedIOException("Transport disposed.");
+ }
+ if(peer==null)
+ throw new IOException("Peer not connected.");
+ if(!peer.disposed){
+
+ if(async){
+ asyncOneWay(command);
+ }else{
+ syncOneWay(command);
+ }
+ }else{
+ throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
+ }
+ }
+
+ protected void syncOneWay(Command command){
+ final TransportListener tl=peer.transportListener;
+ prePeerSetQueue=peer.prePeerSetQueue;
+ if(tl==null){
+ prePeerSetQueue.add(command);
+ }else{
+ tl.onCommand(command);
+ }
+ }
+
+ protected void asyncOneWay(Command command) throws IOException{
+ messageQueue=getMessageQueue();
+ try{
+ messageQueue.put(command);
+ wakeup();
+ }catch(final InterruptedException e){
+ log.error("messageQueue interupted",e);
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback) throws IOException{
+ throw new AssertionError("Unsupported Method");
+ }
+
+ public Response request(Command command) throws IOException{
+ throw new AssertionError("Unsupported Method");
+ }
+
+ public Response request(Command command,int timeout) throws IOException{
+ throw new AssertionError("Unsupported Method");
+ }
+
+ public synchronized TransportListener getTransportListener(){
+ return transportListener;
+ }
+
+ synchronized public void setTransportListener(TransportListener commandListener){
+ this.transportListener=commandListener;
+ wakeup();
+ peer.wakeup();
+ }
+
+ public synchronized void start() throws Exception{
+ started=true;
+ if(transportListener==null)
+ throw new IOException("TransportListener not set.");
+ if(!async){
+ for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
+ Command command=(Command) iter.next();
+ transportListener.onCommand(command);
+ iter.remove();
+ }
+ }else{
+ wakeup();
+ peer.wakeup();
+ }
+ }
+
+ public void stop() throws Exception{
+ started=false;
+ if(!disposed){
+ disposed=true;
+ }
+ if(taskRunner!=null){
+ taskRunner.shutdown();
+ taskRunner=null;
+ }
+ }
+
+ public Object narrow(Class target){
+ if(target.isAssignableFrom(getClass())){
+ return this;
+ }
+ return null;
+ }
+
+ public boolean isMarshal(){
+ return marshal;
+ }
+
+ public void setMarshal(boolean marshal){
+ this.marshal=marshal;
+ }
+
+ public boolean isNetwork(){
+ return network;
+ }
+
+ public void setNetwork(boolean network){
+ this.network=network;
+ }
+
+ public String toString(){
+ return location+"#"+id;
+ }
+
+ public String getRemoteAddress(){
+ if(peer!=null){
+ return peer.toString();
+ }
+ return null;
+ }
+
+ /**
+ * @see org.apache.activemq.thread.Task#iterate()
+ */
+ public boolean iterate(){
+ final TransportListener tl=peer.transportListener;
+ if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
+ final Command command=(Command) messageQueue.poll();
+ tl.onCommand(command);
+ }
+ return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null);
+ }
+
+ /**
+ * @return the async
+ */
+ public boolean isAsync(){
+ return async;
+ }
+
+ /**
+ * @param async the async to set
+ */
+ public void setAsync(boolean async){
+ this.async=async;
+ }
+
+ /**
+ * @return the asyncQueueDepth
+ */
+ public int getAsyncQueueDepth(){
+ return asyncQueueDepth;
+ }
+
+ /**
+ * @param asyncQueueDepth the asyncQueueDepth to set
+ */
+ public void setAsyncQueueDepth(int asyncQueueDepth){
+ this.asyncQueueDepth=asyncQueueDepth;
+ }
+
+ protected void wakeup(){
+ if(async&&messageQueue!=null&&!messageQueue.isEmpty()){
+ if(taskRunner==null){
+ taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
+ }
+ try{
+ taskRunner.wakeup();
+ }catch(InterruptedException e){
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ protected synchronized LinkedBlockingQueue getMessageQueue(){
+ if(messageQueue==null){
+ messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+ }
+ return messageQueue;
+ }
+}
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransport.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransport.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransportFactory.java?view=auto&rev=447591
==============================================================================
--- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransportFactory.java (added)
+++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransportFactory.java Mon Sep 18 15:07:10 2006
@@ -0,0 +1,222 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.openwire.transport.vm;
+
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.openwire.transport.MarshallingTransportFilter;
+import org.apache.geronimo.openwire.transport.Transport;
+import org.apache.geronimo.openwire.transport.TransportFactory;
+import org.apache.geronimo.openwire.transport.TransportServer;
+import org.apache.geronimo.openwire.util.IOExceptionSupport;
+import org.apache.geronimo.openwire.util.IntrospectionSupport;
+import org.apache.geronimo.openwire.util.ServiceSupport;
+import org.apache.geronimo.openwire.util.URISupport;
+import org.apache.geronimo.openwire.util.URISupport.CompositeData;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
+
+public class VMTransportFactory extends TransportFactory{
+ private static final Log log = LogFactory.getLog(VMTransportFactory.class);
+ final public static ConcurrentHashMap brokers=new ConcurrentHashMap();
+ final public static ConcurrentHashMap connectors=new ConcurrentHashMap();
+ final public static ConcurrentHashMap servers=new ConcurrentHashMap();
+ //BrokerFactoryHandler brokerFactoryHandler;
+
+ public Transport doConnect(URI location) throws Exception{
+ return VMTransportServer.configure(doCompositeConnect(location));
+ }
+
+ public Transport doCompositeConnect(URI location) throws Exception{
+ URI brokerURI;
+ String host;
+ Map options;
+ CompositeData data=URISupport.parseComposite(location);
+ if(data.getComponents().length==1&&"broker".equals(data.getComponents()[0].getScheme())){
+ brokerURI=data.getComponents()[0];
+ CompositeData brokerData=URISupport.parseComposite(brokerURI);
+ host=(String) brokerData.getParameters().get("brokerName");
+ if(host==null)
+ host="localhost";
+ if(brokerData.getPath()!=null)
+ host=data.getPath();
+ options=data.getParameters();
+ location=new URI("vm://"+host);
+ }else{
+ // If using the less complex vm://localhost?broker.persistent=true form
+ try{
+ host=location.getHost();
+ options=URISupport.parseParamters(location);
+ String config=(String) options.remove("brokerConfig");
+ if(config!=null){
+ brokerURI=new URI(config);
+ }else{
+ Map brokerOptions=IntrospectionSupport.extractProperties(options,"broker.");
+ brokerURI=new URI("broker://()/"+host+"?"+URISupport.createQueryString(brokerOptions));
+ }
+ }catch(URISyntaxException e1){
+ throw IOExceptionSupport.create(e1);
+ }
+ location=new URI("vm://"+host);
+ }
+ if (host == null) {
+ host = "localhost";
+ }
+ VMTransportServer server=(VMTransportServer) servers.get(host);
+ // validate the broker is still active
+ /* TODO: quickly hacked out just to have the class, needs to be fixed long term
+ if(!validateBroker(host)||server==null){
+ BrokerService broker=null;
+ // Synchronize on the registry so that multiple concurrent threads
+ // doing this do not think that the broker has not been created and cause multiple
+ // brokers to be started.
+ synchronized( BrokerRegistry.getInstance().getRegistryMutext() ) {
+ broker=BrokerRegistry.getInstance().lookup(host);
+ if(broker==null){
+ try{
+ if(brokerFactoryHandler!=null){
+ broker=brokerFactoryHandler.createBroker(brokerURI);
+ }else{
+ broker=BrokerFactory.createBroker(brokerURI);
+ }
+ broker.start();
+ }catch(URISyntaxException e){
+ throw IOExceptionSupport.create(e);
+ }
+ brokers.put(host,broker);
+ }
+
+ server=(VMTransportServer) servers.get(host);
+ if(server==null){
+ server=(VMTransportServer) bind(location,true);
+ TransportConnector connector=new TransportConnector(broker.getBroker(),server);
+ connector.setUri(location);
+ connector.setTaskRunnerFactory( broker.getTaskRunnerFactory() );
+ connector.start();
+ connectors.put(host,connector);
+ }
+
+ }
+ }
+ */
+
+ VMTransport vmtransport=server.connect();
+ IntrospectionSupport.setProperties(vmtransport,options);
+ Transport transport=vmtransport;
+ if(vmtransport.isMarshal()){
+ HashMap optionsCopy=new HashMap(options);
+ transport=new MarshallingTransportFilter(transport,createWireFormat(options),createWireFormat(optionsCopy));
+ }
+ if(!options.isEmpty()){
+ throw new IllegalArgumentException("Invalid connect parameters: "+options);
+ }
+ return transport;
+ }
+
+ public TransportServer doBind(String brokerId,URI location) throws IOException{
+ return bind(location,false);
+ }
+
+ /**
+ * @param location
+ * @return the TransportServer
+ * @throws IOException
+ */
+ private TransportServer bind(URI location,boolean dispose) throws IOException{
+ String host=location.getHost();
+ log.debug("binding to broker: " + host);
+ VMTransportServer server=new VMTransportServer(location,dispose);
+ Object currentBoundValue=servers.get(host);
+ if(currentBoundValue!=null){
+ throw new IOException("VMTransportServer already bound at: "+location);
+ }
+ servers.put(host,server);
+ return server;
+ }
+
+ public static void stopped(VMTransportServer server){
+ /*
+ String host=server.getBindURI().getHost();
+ servers.remove(host);
+ TransportConnector connector=(TransportConnector) connectors.remove(host);
+ if(connector!=null){
+ log.debug("Shutting down VM connectors for broker: " +host);
+ ServiceSupport.dispose(connector);
+ BrokerService broker=(BrokerService) brokers.remove(host);
+ if(broker!=null){
+ ServiceSupport.dispose(broker);
+ }
+ }
+ */
+ }
+
+ public static void stopped(String host){
+ /*
+ servers.remove(host);
+ TransportConnector connector=(TransportConnector) connectors.remove(host);
+ if(connector!=null){
+ log.debug("Shutting down VM connectors for broker: " +host);
+ ServiceSupport.dispose(connector);
+ BrokerService broker=(BrokerService) brokers.remove(host);
+ if(broker!=null){
+ ServiceSupport.dispose(broker);
+ }
+ }
+ */
+ }
+
+/*
+ public BrokerFactoryHandler getBrokerFactoryHandler(){
+ return brokerFactoryHandler;
+ }
+
+ public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler){
+ this.brokerFactoryHandler=brokerFactoryHandler;
+ }
+*/
+ private boolean validateBroker(String host){
+ boolean result=true;
+ /*
+ if(brokers.containsKey(host)||servers.containsKey(host)||connectors.containsKey(host)){
+ // check the broker is still in the BrokerRegistry
+ TransportConnector connector=(TransportConnector) connectors.get(host);
+ if(BrokerRegistry.getInstance().lookup(host)==null||(connector!=null&&connector.getBroker().isStopped())){
+ result=false;
+ // clean-up
+ brokers.remove(host);
+ servers.remove(host);
+ if(connector!=null){
+ connectors.remove(host);
+ if(connector!=null){
+ ServiceSupport.dispose(connector);
+ }
+ }
+ }
+ }
+ */
+ return result;
+ }
+}
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransportFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransportFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/transport/vm/VMTransportFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain