You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:24:44 UTC
svn commit: r961062 [6/14] - in /activemq/sandbox/activemq-apollo-actor: ./
activemq-all/ activemq-all/src/test/ide-resources/
activemq-all/src/test/java/org/apache/activemq/jaxb/
activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/...
Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala Wed Jul 7 03:24:02 2010
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ng
+
+import _root_.java.util.{LinkedList, ArrayList}
+import _root_.org.apache.activemq.ng.Stomp
+import java.nio.channels.{SocketChannel}
+import java.nio.ByteBuffer
+import java.io.{EOFException, IOException}
+import _root_.org.apache.activemq.util.buffer._
+import collection.mutable.{ListBuffer, HashMap}
+
+import AsciiBuffer._
+import Stomp._
+import Stomp.Headers._
+
+object StompWireFormat {
+ val READ_BUFFFER_SIZE = 1024*64;
+ val MAX_COMMAND_LENGTH = 1024;
+ val MAX_HEADER_LENGTH = 1024 * 10;
+ val MAX_HEADERS = 1000;
+ val MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ val TRIM=false
+ val SIZE_CHECK=false
+ }
+
+class StompWireFormat {
+ import StompWireFormat._
+
+ implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
+ implicit def wrap(x: Byte) = {
+ ByteBuffer.wrap(Array(x));
+ }
+
+// var outbound_pos=0
+// var outbound_limit=0
+// var outbound_buffers: ListBuffer[ByteBuffer] = new ListBuffer[ByteBuffer]()
+//
+// /**
+// * @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
+// */
+// def drain_source(socket:SocketChannel)(source: =>StompFrame ):Boolean = {
+// while(true) {
+// // if we have a pending frame that is being sent over the socket...
+// if( !outbound_buffers.isEmpty ) {
+//
+// val data = outbound_buffers.toArray
+//
+// socket.write(data)
+//
+// // remove all the written buffers...
+// while( !outbound_buffers.isEmpty && outbound_buffers.head.remaining==0 ) {
+// outbound_buffers.remove(0)
+// }
+//
+// if( !outbound_buffers.isEmpty ) {
+// // non blocking socket returned before the buffers were fully written to disk..
+// // we are not yet fully drained.. but need to quit now.
+// return false
+// }
+//
+// } else {
+//
+// var frame = source
+// while( frame!=null ) {
+// marshall(outbound_buffers, frame)
+// frame = source
+// }
+//
+// if( outbound_buffers.size == 0 ) {
+// // the source is now drained...
+// return true
+// }
+// }
+// }
+// true
+// }
+//
+// implicit def toByteBuffer(data:AsciiBuffer) = ByteBuffer.wrap(data.data, data.offset, data.length)
+// implicit def toByteBuffer(data:Buffer) = ByteBuffer.wrap(data.data, data.offset, data.length)
+//
+// def marshall(buffer:ListBuffer[ByteBuffer], frame:StompFrame) = {
+// buffer.append(frame.action)
+// buffer.append(NEWLINE)
+//
+// // we can optimize a little if the headers and content are in the same buffer..
+// if( !frame.headers.isEmpty && !frame.content.isEmpty &&
+// ( frame.headers.getFirst._1.data eq frame.content.data ) ) {
+// buffer.append( ByteBuffer.wrap(frame.content.data, frame.headers.getFirst._1.offset, (frame.content.offset-frame.headers.getFirst._1.offset)+ frame.content.length) )
+//
+// } else {
+// val i = frame.headers.iterator
+// while( i.hasNext ) {
+// val (key, value) = i.next
+// buffer.append(key)
+// buffer.append(SEPERATOR)
+// buffer.append(value)
+// buffer.append(NEWLINE)
+// }
+//
+// buffer.append(NEWLINE)
+// buffer.append(toByteBuffer(frame.content))
+// }
+// buffer.append(toByteBuffer(END_OF_FRAME_BUFFER))
+// }
+
+ var outbound_frame: ByteBuffer = null
+ /**
+ * @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
+ */
+ def drain_source(socket:SocketChannel)(source: =>StompFrame ):Boolean = {
+ while(true) {
+ // if we have a pending frame that is being sent over the socket...
+ if( outbound_frame!=null ) {
+ socket.write(outbound_frame)
+ if( outbound_frame.remaining != 0 ) {
+ // non blocking socket returned before the buffers were fully written to disk..
+ // we are not yet fully drained.. but need to quit now.
+ return false
+ } else {
+ outbound_frame = null
+ }
+ } else {
+
+ // marshall all the available frames..
+ val buffer = new ByteArrayOutputStream()
+ var frame = source
+ while( frame!=null ) {
+ marshall(buffer, frame)
+ frame = source
+ }
+
+
+ if( buffer.size() ==0 ) {
+ // the source is now drained...
+ return true
+ } else {
+ val b = buffer.toBuffer;
+ outbound_frame = ByteBuffer.wrap(b.data, b.offset, b.length)
+ }
+ }
+ }
+ true
+ }
+
+ def marshall(buffer:ByteArrayOutputStream, frame:StompFrame) = {
+ buffer.write(frame.action)
+ buffer.write(NEWLINE)
+
+ // we can optimize a little if the headers and content are in the same buffer..
+ if( !frame.headers.isEmpty && !frame.content.isEmpty &&
+ ( frame.headers.getFirst._1.data eq frame.content.data ) ) {
+
+ val offset = frame.headers.getFirst._1.offset;
+ val buffer1 = frame.headers.getFirst._1;
+ val buffer2 = frame.content;
+ val length = (buffer2.offset-buffer1.offset)+buffer2.length
+ buffer.write( buffer1.data, offset, length)
+
+ } else {
+ val i = frame.headers.iterator
+ while( i.hasNext ) {
+ val (key, value) = i.next
+ buffer.write(key)
+ buffer.write(SEPERATOR)
+ buffer.write(value)
+ buffer.write(NEWLINE)
+ }
+
+ buffer.write(NEWLINE)
+ buffer.write(frame.content)
+ }
+ buffer.write(END_OF_FRAME_BUFFER)
+ }
+
+
+ var read_pos = 0
+ var read_offset = 0
+ var read_data:Array[Byte] = new Array[Byte](READ_BUFFFER_SIZE)
+ var read_bytebuffer:ByteBuffer = ByteBuffer.wrap(read_data)
+
+ def drain_socket(socket:SocketChannel)(handler:(StompFrame)=>Boolean) = {
+ var done = false
+
+ // keep going until the socket buffer is drained.
+ while( !done ) {
+ val frame = unmarshall()
+ if( frame!=null ) {
+ // the handler might want us to stop looping..
+ done = handler(frame)
+ } else {
+
+ // do we need to read in more data???
+ if( read_pos==read_bytebuffer.position ) {
+
+ // do we need a new data buffer to read data into??
+ if(read_bytebuffer.remaining==0) {
+
+ // The capacity needed grows by powers of 2...
+ val new_capacity = if( read_offset != 0 ) { READ_BUFFFER_SIZE } else { read_data.length << 2 }
+ val tmp_buffer = new Array[Byte](new_capacity)
+
+ // If there was un-consummed data.. copy it over...
+ val size = read_pos - read_offset
+ if( size > 0 ) {
+ System.arraycopy(read_data, read_offset, tmp_buffer, 0, size)
+ }
+ read_data = tmp_buffer
+ read_bytebuffer = ByteBuffer.wrap(read_data)
+ read_bytebuffer.position(size)
+ read_offset = 0;
+ read_pos = size
+
+ }
+
+ // Try to fill the buffer with data from the nio socket..
+ var p = read_bytebuffer.position
+ if( socket.read(read_bytebuffer) == -1 ) {
+ throw new EOFException();
+ }
+ // we are done if there was no data on the socket.
+ done = read_bytebuffer.position==p
+ }
+ }
+ }
+ }
+
+
+ type FrameReader = ()=>StompFrame
+ var unmarshall:FrameReader = read_action
+
+ def read_line( maxLength:Int, errorMessage:String):Buffer = {
+ val read_limit = read_bytebuffer.position
+ while( read_pos < read_limit ) {
+ if( read_data(read_pos) =='\n') {
+ var rc = new Buffer(read_data, read_offset, read_pos-read_offset)
+ read_pos += 1;
+ read_offset = read_pos;
+ return rc
+ }
+ if (SIZE_CHECK && read_pos-read_offset > maxLength) {
+ throw new IOException(errorMessage);
+ }
+ read_pos += 1;
+ }
+ return null;
+ }
+
+
+ def read_action:FrameReader = ()=> {
+ val line = read_line(MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+ if( line !=null ) {
+ var action = line
+ if( TRIM ) {
+ action = action.trim();
+ }
+ if (action.length() > 0) {
+ unmarshall = read_headers(action)
+ }
+ }
+ null
+ }
+
+ type HeaderMap = LinkedList[(AsciiBuffer, AsciiBuffer)]
+
+ def read_headers(action:Buffer, headers:HeaderMap=new LinkedList()):FrameReader = ()=> {
+ val line = read_line(MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+ if( line !=null ) {
+ if( line.trim().length() > 0 ) {
+
+ if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+ throw new IOException("The maximum number of headers was exceeded");
+ }
+
+ try {
+ val seperatorIndex = line.indexOf(SEPERATOR);
+ if( seperatorIndex<0 ) {
+ throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+ }
+ var name = line.slice(0, seperatorIndex);
+ if( TRIM ) {
+ name = name.trim();
+ }
+ var value = line.slice(seperatorIndex + 1, line.length());
+ if( TRIM ) {
+ value = value.trim();
+ }
+ headers.add((ascii(name), ascii(value)));
+ } catch {
+ case e:Exception=>
+ throw new IOException("Unable to parser header line [" + line + "]");
+ }
+
+ } else {
+ val contentLength = get(headers, CONTENT_LENGTH)
+ if (contentLength.isDefined) {
+ // Bless the client, he's telling us how much data to read in.
+ var length=0;
+ try {
+ length = Integer.parseInt(contentLength.get.trim().toString());
+ } catch {
+ case e:NumberFormatException=>
+ throw new IOException("Specified content-length is not a valid integer");
+ }
+
+ if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+ throw new IOException("The maximum data length was exceeded");
+ }
+ unmarshall = read_binary_body(action, headers, length)
+
+ } else {
+ unmarshall = read_text_body(action, headers)
+ }
+ }
+ }
+ null
+ }
+
+ def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
+ val i = headers.iterator
+ while( i.hasNext ) {
+ val entry = i.next
+ if( entry._1 == name ) {
+ return Some(entry._2)
+ }
+ }
+ None
+ }
+
+
+ def read_binary_body(action:Buffer, headers:HeaderMap, contentLength:Int):FrameReader = ()=> {
+ val content:Buffer=read_content(contentLength)
+ if( content != null ) {
+ unmarshall = read_action
+ new StompFrame(ascii(action), headers, content)
+ } else {
+ null
+ }
+ }
+
+
+ def read_content(contentLength:Int):Buffer = {
+ val read_limit = read_bytebuffer.position
+ if( (read_limit-read_offset) < contentLength+1 ) {
+ read_pos = read_limit;
+ null
+ } else {
+ if( read_data(read_offset+contentLength)!= 0 ) {
+ throw new IOException("Exected null termintor after "+contentLength+" content bytes");
+ }
+ var rc = new Buffer(read_data, read_offset, contentLength)
+ read_pos = read_offset+contentLength+1;
+ read_offset = read_pos;
+ rc;
+ }
+ }
+
+ def read_to_null():Buffer = {
+ val read_limit = read_bytebuffer.position
+ while( read_pos < read_limit ) {
+ if( read_data(read_pos) ==0) {
+ var rc = new Buffer(read_data, read_offset, read_pos-read_offset)
+ read_pos += 1;
+ read_offset = read_pos;
+ return rc;
+ }
+ read_pos += 1;
+ }
+ return null;
+ }
+
+
+ def read_text_body(action:Buffer, headers:HeaderMap):FrameReader = ()=> {
+ val content:Buffer=read_to_null
+ if( content != null ) {
+ unmarshall = read_action
+ new StompFrame(ascii(action), headers, content)
+ } else {
+ null
+ }
+ }
+
+
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul 7 03:24:02 2010
@@ -35,9 +35,10 @@
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-queue</artifactId>
+ <artifactId>activemq-util</artifactId>
+ <version>6.0-SNAPSHOT</version>
</dependency>
-
+
<!-- Testing Dependencies -->
<dependency>
<groupId>junit</groupId>
Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java (from r946684, activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueDescriptor.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueDescriptor.java&r1=946684&r2=961062&rev=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueDescriptor.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java Wed Jul 7 03:24:02 2010
@@ -14,10 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.queue;
+package org.apache.activemq.broker.store;
import org.apache.activemq.util.buffer.AsciiBuffer;
+
public class QueueDescriptor {
public static final short SHARED = 0;
@@ -73,8 +74,6 @@ public class QueueDescriptor {
}
/**
- * @param type
- * The type of the queue.
*/
public short getApplicationType() {
return applicationType;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Wed Jul 7 03:24:02 2010
@@ -21,7 +21,7 @@ import java.util.Collection;
import java.util.Iterator;
import org.apache.activemq.Service;
-import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.buffer.Buffer;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Wed Jul 7 03:24:02 2010
@@ -27,11 +27,8 @@ import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.DuplicateKeyException;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.SubscriptionRecord;
-import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.util.Comparators;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.buffer.Buffer;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Wed Jul 7 03:24:02 2010
@@ -33,7 +33,7 @@ import org.apache.activemq.broker.store.
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
import org.apache.activemq.metric.Period;
-import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.buffer.Buffer;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Wed Jul 7 03:24:02 2010
@@ -31,7 +31,7 @@ import org.apache.activemq.broker.store.
import org.apache.activemq.broker.store.Store.Session;
import org.apache.activemq.broker.store.Store.SubscriptionRecord;
import org.apache.activemq.broker.store.Store.VoidCallback;
-import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.buffer.Buffer;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/pom.xml Wed Jul 7 03:24:02 2010
@@ -34,15 +34,23 @@
<dependencies>
<dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-dispatcher</artifactId>
- </dependency>
- <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-util</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.fusesource.hawtdispatch</groupId>
+ <artifactId>hawtdispatch</artifactId>
+ <version>${hawtdispatch-version}</version>
+ </dependency>
+
<!-- Testing Dependencies -->
<dependency>
<groupId>junit</groupId>
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java (from r946684, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java&r1=946684&r2=961062&rev=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java Wed Jul 7 03:24:02 2010
@@ -14,15 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.activemq.dispatch;
+package org.apache.activemq.transport;
/**
- *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public enum DispatchPriority {
- HIGH,
- DEFAULT,
- LOW;
+public interface CompletionCallback {
+ void onCompletion();
+ public void onFailure(Throwable caught);
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompositeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompositeTransport.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompositeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompositeTransport.java Wed Jul 7 03:24:02 2010
@@ -19,6 +19,6 @@ package org.apache.activemq.transport;
import java.net.URI;
public interface CompositeTransport extends Transport {
- void add(URI[] uris);
- void remove(URI[] uris);
+ void add(URI...uris);
+ void remove(URI...uris);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java Wed Jul 7 03:24:02 2010
@@ -41,15 +41,16 @@ public class DefaultTransportListener im
}
/**
- * The transport has suffered an interuption from which it hopes to recover
+ * The transport has been connected.
*/
- public void transportInterupted() {
+ public void onConnected() {
}
/**
- * The transport has resumed after an interuption
+ * The transport has suffered a disconnection from
+ * which it hopes to recover
*/
- public void transportResumed() {
+ public void onDisconnected() {
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/FutureResponse.java Wed Jul 7 03:24:02 2010
@@ -27,10 +27,10 @@ import org.apache.commons.logging.LogFac
public class FutureResponse<T> {
private static final Log LOG = LogFactory.getLog(FutureResponse.class);
- private final ResponseCallback<T> responseCallback;
+ private final RequestCallback<T> responseCallback;
private final ArrayBlockingQueue<T> responseSlot = new ArrayBlockingQueue<T>(1);
- public FutureResponse(ResponseCallback<T> responseCallback) {
+ public FutureResponse(RequestCallback<T> responseCallback) {
this.responseCallback = responseCallback;
}
@@ -57,7 +57,7 @@ public class FutureResponse<T> {
public void set(T result) {
if (responseSlot.offer(result)) {
if (responseCallback != null) {
- responseCallback.onCompletion(this);
+ responseCallback.onCompletion(result);
}
}
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/RequestCallback.java (from r946684, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/RequestCallback.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/RequestCallback.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java&r1=946684&r2=961062&rev=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/RequestCallback.java Wed Jul 7 03:24:02 2010
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.activemq.actor;
+package org.apache.activemq.transport;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public interface IPizzaService {
- public void order(long count);
-}
\ No newline at end of file
+public interface RequestCallback<T> {
+ void onCompletion(T resp);
+ public void onFailure(Throwable caught);
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul 7 03:24:02 2010
@@ -21,104 +21,65 @@ import java.net.URI;
import org.apache.activemq.Service;
import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.DispatchQueue;
/**
- * Represents the client side of a transport allowing messages to be sent
- * synchronously, asynchronously and consumed.
+ * Represents an abstract connection. It can be a client side or server side connection.
*
- * @version $Revision: 1.5 $
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface Transport extends Service {
+ @Deprecated
+ void oneway(Object command);
+
/**
- * A one way asynchronous send
+ * A one way asynchronous send. Once the command is transmitted the callback
+ * is invoked.
*
* @param command
+ * @param callback
* @throws IOException
*/
- void oneway(Object command) throws IOException;
+ void oneway(Object command, CompletionCallback callback);
/**
- * An asynchronous request response where the Receipt will be returned in
- * the future. If responseCallback is not null, then it will be called when
- * the response has been completed.
- *
- * @param command
- * @param responseCallback TODO
- * @return the FutureResponse
- * @throws IOException
+ * Returns the current transport listener
+ *
+ * @return
*/
- <T> FutureResponse<T> asyncRequest(Object command, ResponseCallback<T> responseCallback) throws IOException;
+ TransportListener getTransportListener();
/**
- * A synchronous request response
- *
- * @param command
- * @return the response
- * @throws IOException
+ * Registers an inbound command listener
+ *
+ * @param commandListener
*/
- Object request(Object command) throws IOException;
+ void setTransportListener(TransportListener commandListener);
/**
- * A synchronous request response
- *
- * @param command
- * @param timeout
- * @return the repsonse or null if timeout
- * @throws IOException
+ * Returns the dispatch queue used by the transport
+ *
+ * @return
*/
- Object request(Object command, int timeout) throws IOException;
+ DispatchQueue getDispatchQueue();
- // /**
- // * A one way asynchronous send
- // * @param command
- // * @throws IOException
- // */
- // void oneway(Command command) throws IOException;
- //
- // /**
- // * An asynchronous request response where the Receipt will be returned
- // * in the future. If responseCallback is not null, then it will be called
- // * when the response has been completed.
- // *
- // * @param command
- // * @param responseCallback TODO
- // * @return the FutureResponse
- // * @throws IOException
- // */
- // FutureResponse asyncRequest(Command command, ResponseCallback
- // responseCallback) throws IOException;
- //
- // /**
- // * A synchronous request response
- // * @param command
- // * @return the response
- // * @throws IOException
- // */
- // Response request(Command command) throws IOException;
- //
- // /**
- // * A synchronous request response
- // * @param command
- // * @param timeout
- // * @return the repsonse or null if timeout
- // * @throws IOException
- // */
- // Response request(Command command, int timeout) throws IOException;
+ /**
+ * Sets the dispatch queue used by the transport
+ *
+ * @param queue
+ */
+ void setDispatchQueue(DispatchQueue queue);
/**
- * Returns the current transport listener
- *
- * @return
+ * suspend delivery of commands.
*/
- TransportListener getTransportListener();
+ void suspend();
/**
- * Registers an inbound command listener
- *
- * @param commandListener
+ * resume delivery of commands.
*/
- void setTransportListener(TransportListener commandListener);
+ void resume();
/**
* @param target
@@ -137,16 +98,7 @@ public interface Transport extends Servi
* @return true if fault tolerant
*/
boolean isFaultTolerant();
-
- /**
- * Indicates that the transport needs inactivity monitoring. This
- * is true for transports like tcp that may not otherwise detect
- * a transport failure in a timely fashion.
- *
- * @return true if the transport requires inactivity monitoring.
- */
- boolean isUseInactivityMonitor();
-
+
/**
* @return true if the transport is disposed
*/
@@ -161,12 +113,14 @@ public interface Transport extends Servi
* @return The wireformat for the connection.
*/
WireFormat getWireformat();
-
+
+ void setWireformat(WireFormat wireformat);
+
/**
* reconnect to another location
* @param uri
* @throws IOException on failure of if not supported
*/
- void reconnect(URI uri) throws IOException;
+ void reconnect(URI uri, CompletionCallback callback);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java Wed Jul 7 03:24:02 2010
@@ -16,191 +16,40 @@
*/
package org.apache.activemq.transport;
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.fusesource.hawtdispatch.DispatchQueue;
+
import java.io.IOException;
-import java.net.MalformedURLException;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-import org.apache.activemq.util.FactoryFinder;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.URISupport;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.wireformat.WireFormatFactory;
-public abstract class TransportFactory {
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class TransportFactory {
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
- private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
- private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
-
- private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
- private static final String THREAD_NAME_FILTER = "threadName";
-
- public abstract TransportServer doBind(URI location) throws IOException;
-
- public Transport doConnect(URI location, Executor ex) throws Exception {
- return doConnect(location);
- }
-
- public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
- return doCompositeConnect(location);
- }
-
- /**
- * Creates a normal transport.
- *
- * @param location
- * @return the transport
- * @throws Exception
- */
- public static Transport connect(URI location) throws Exception {
- TransportFactory tf = findTransportFactory(location);
- return tf.doConnect(location);
- }
-
- /**
- * Creates a normal transport.
- *
- * @param location
- * @param ex
- * @return the transport
- * @throws Exception
- */
- public static Transport connect(URI location, Executor ex) throws Exception {
- TransportFactory tf = findTransportFactory(location);
- return tf.doConnect(location, ex);
- }
-
- /**
- * Creates a slimmed down transport that is more efficient so that it can be
- * used by composite transports like reliable and HA.
- *
- * @param location
- * @return the Transport
- * @throws Exception
- */
- public static Transport compositeConnect(URI location) throws Exception {
- TransportFactory tf = findTransportFactory(location);
- return tf.doCompositeConnect(location);
- }
-
- /**
- * Creates a slimmed down transport that is more efficient so that it can be
- * used by composite transports like reliable and HA.
- *
- * @param location
- * @param ex
- * @return the Transport
- * @throws Exception
- */
- public static Transport compositeConnect(URI location, Executor ex) throws Exception {
- TransportFactory tf = findTransportFactory(location);
- return tf.doCompositeConnect(location, ex);
- }
-
- public static TransportServer bind(URI location) throws IOException {
- TransportFactory tf = findTransportFactory(location);
- return tf.doBind(location);
- }
-
- /**
- * @deprecated
- */
- public static TransportServer bind(String brokerId, URI location) throws IOException {
- return bind(location);
- }
-
-// public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
-// TransportFactory tf = findTransportFactory(location);
-// if( brokerService!=null && tf instanceof BrokerServiceAware ) {
-// ((BrokerServiceAware)tf).setBrokerService(brokerService);
-// }
-// try {
-// if( brokerService!=null ) {
-// SslContext.setCurrentSslContext(brokerService.getSslContext());
-// }
-// return tf.doBind(location);
-// } finally {
-// SslContext.setCurrentSslContext(null);
-// }
-// }
-
- public Transport doConnect(URI location) throws Exception {
- try {
- Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
- WireFormat wf = createWireFormat(options);
- Transport transport = createTransport(location, wf);
- Transport rc = configure(transport, wf, options);
- if (!options.isEmpty()) {
- // Release the transport resource as we are erroring out...
- try {
- rc.stop();
- } catch (Throwable cleanup) {
- }
- throw new IllegalArgumentException("Invalid connect parameters: " + options);
- }
- return rc;
- } catch (URISyntaxException e) {
- throw IOExceptionSupport.create(e);
- }
- }
+ private static final ConcurrentHashMap<String, TransportFactorySPI> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactorySPI>();
- public Transport doCompositeConnect(URI location) throws Exception {
- try {
- Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
- WireFormat wf = createWireFormat(options);
- Transport transport = createTransport(location, wf);
- Transport rc = compositeConfigure(transport, wf, options);
- if (!options.isEmpty()) {
- throw new IllegalArgumentException("Invalid connect parameters: " + options);
- }
- return rc;
-
- } catch (URISyntaxException e) {
- throw IOExceptionSupport.create(e);
- }
+ public interface TransportFactorySPI {
+ public TransportServer bind(URI location) throws Exception;
+ public Transport connect(URI location) throws Exception;
}
- /**
- * Allow registration of a transport factory without wiring via META-INF classes
- * @param scheme
- * @param tf
- */
- public static void registerTransportFactory(String scheme, TransportFactory tf) {
- TRANSPORT_FACTORYS.put(scheme, tf);
- }
-
/**
- * Factory method to create a new transport
- *
- * @throws IOException
- * @throws UnknownHostException
*/
- protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
- throw new IOException("createTransport() method not implemented!");
- }
-
- /**
- * @param location
- * @return
- * @throws IOException
- */
- private static TransportFactory findTransportFactory(URI location) throws IOException {
+ private static TransportFactorySPI factory(URI location) throws IOException {
String scheme = location.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + location + "]");
}
- TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
+ TransportFactorySPI tf = TRANSPORT_FACTORYS.get(scheme);
if (tf == null) {
// Try to load if from a META-INF property.
try {
- tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
+ tf = (TransportFactorySPI)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
TRANSPORT_FACTORYS.put(scheme, tf);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
@@ -209,94 +58,29 @@ public abstract class TransportFactory {
return tf;
}
- protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
- WireFormatFactory factory = createWireFormatFactory(options);
- if( factory == null ) {
- return null;
- }
- WireFormat format = factory.createWireFormat();
- return format;
- }
-
- protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
- String wireFormat = (String)options.remove("wireFormat");
- if (wireFormat == null) {
- wireFormat = getDefaultWireFormatType();
- }
- if( "null".equals(wireFormat) ) {
- return null;
- }
-
- try {
- WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
- IntrospectionSupport.setProperties(wff, options, "wireFormat.");
- return wff;
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
- }
- }
- protected String getDefaultWireFormatType() {
- return "default";
- }
-
- /**
- * Fully configures and adds all need transport filters so that the
- * transport can be used by the JMS client.
- *
- * @param transport
- * @param wf
- * @param options
- * @return
- * @throws Exception
+ /**
+ * Allow registration of a transport factory without wiring via META-INF classes
+ * @param scheme
+ * @param tf
*/
- public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
- transport = compositeConfigure(transport, wf, options);
-
- transport = new MutexTransport(transport);
-
- return transport;
+ public static void registerTransportFactory(String scheme, TransportFactorySPI tf) {
+ TRANSPORT_FACTORYS.put(scheme, tf);
}
/**
- * Fully configures and adds all need transport filters so that the
- * transport can be used by the ActiveMQ message broker. The main difference
- * between this and the configure() method is that the broker does not issue
- * requests to the client so the ResponseCorrelator is not needed.
- *
- * @param transport
- * @param format
- * @param options
- * @return
- * @throws Exception
+ * Creates a client transport.
*/
- public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
- if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
- transport = new WriteTimeoutFilter(transport);
- String soWriteTimeout = (String)options.get(WRITE_TIMEOUT_FILTER);
- if (soWriteTimeout!=null) ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
- }
- if (options.containsKey(THREAD_NAME_FILTER)) {
- transport = new ThreadNameFilter(transport);
- }
- transport = compositeConfigure(transport, format, options);
- transport = new MutexTransport(transport);
- return transport;
+ public static Transport connect(URI location) throws Exception {
+ return factory(location).connect(location);
}
/**
- * Similar to configure(...) but this avoid adding in the MutexTransport and
- * ResponseCorrelator transport layers so that the resulting transport can
- * more efficiently be used as part of a composite transport.
- *
- * @param transport
- * @param format
- * @param options
- * @return
+ * Creates a transport server.
*/
- public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- IntrospectionSupport.setProperties(transport, options);
- return transport;
+ public static TransportServer bind(URI location) throws Exception {
+ return factory(location).bind(location);
}
+
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java Wed Jul 7 03:24:02 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class TransportFactorySupport{
+
+ private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
+
+ private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
+ private static final String THREAD_NAME_FILTER = "threadName";
+
+ static public Transport configure(Transport transport, Map<String, String> options) throws IOException {
+ WireFormat wf = createWireFormat(options);
+ transport.setWireformat(wf);
+ IntrospectionSupport.setProperties(transport, options);
+ return transport;
+ }
+
+ public static Transport verify(Transport transport, Map<String, String> options) {
+ if (!options.isEmpty()) {
+ // Release the transport resource as we are erroring out...
+ try {
+ transport.stop();
+ } catch (Throwable cleanup) {
+ }
+ throw new IllegalArgumentException("Invalid connect parameters: " + options);
+ }
+ return transport;
+ }
+
+ static public WireFormat createWireFormat(Map<String, String> options) throws IOException {
+ WireFormatFactory factory = createWireFormatFactory(options);
+ if( factory == null ) {
+ return null;
+ }
+ WireFormat format = factory.createWireFormat();
+ return format;
+ }
+
+ static public WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
+ String wireFormat = (String)options.remove("wireFormat");
+ if (wireFormat == null) {
+ wireFormat = getDefaultWireFormatType();
+ }
+ if( "null".equals(wireFormat) ) {
+ return null;
+ }
+
+ try {
+ WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
+ IntrospectionSupport.setProperties(wff, options, "wireFormat.");
+ return wff;
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+ }
+ }
+
+ static protected String getDefaultWireFormatType() {
+ return "default";
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul 7 03:24:02 2010
@@ -20,31 +20,60 @@ import java.io.IOException;
import java.net.URI;
import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.DispatchQueue;
/**
* @version $Revision: 1.5 $
*/
public class TransportFilter implements TransportListener, Transport {
- protected final Transport next;
+
+ protected Transport next;
protected TransportListener transportListener;
public TransportFilter(Transport next) {
this.next = next;
}
+ /**
+ * @return Returns the next transport.
+ */
+ public Transport getNext() {
+ return next;
+ }
+
+ public void setNext(Transport next) {
+ this.next = next;
+ }
+
public TransportListener getTransportListener() {
return transportListener;
}
- public void setTransportListener(TransportListener channelListener) {
- this.transportListener = channelListener;
- if (channelListener == null) {
+ public void setTransportListener(TransportListener listener) {
+ this.transportListener = listener;
+ if (listener == null) {
next.setTransportListener(null);
} else {
next.setTransportListener(this);
}
}
+ public DispatchQueue getDispatchQueue() {
+ return next.getDispatchQueue();
+ }
+
+ public void setDispatchQueue(DispatchQueue queue) {
+ next.setDispatchQueue(queue);
+ }
+
+ public void suspend() {
+ next.suspend();
+ }
+
+ public void resume() {
+ next.resume();
+ }
+
/**
* @see org.apache.activemq.Service#start()
* @throws IOException
@@ -52,7 +81,7 @@ public class TransportFilter implements
*/
public void start() throws Exception {
if (next == null) {
- throw new IOException("The next channel has not been set.");
+ throw new IOException("The next transport has not been set.");
}
if (transportListener == null) {
throw new IOException("The command listener has not been set.");
@@ -71,43 +100,31 @@ public class TransportFilter implements
transportListener.onCommand(command);
}
- /**
- * @return Returns the next.
- */
- public Transport getNext() {
- return next;
- }
public String toString() {
return next.toString();
}
- public void oneway(Object command) throws IOException {
- next.oneway(command);
+ @Deprecated
+ public void oneway(Object command) {
+ oneway(command, null);
}
- public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
- return next.asyncRequest(command, null);
+ public void oneway(Object command, CompletionCallback callback) {
+ next.oneway(command, callback);
}
- public Object request(Object command) throws IOException {
- return next.request(command);
- }
-
- public Object request(Object command, int timeout) throws IOException {
- return next.request(command, timeout);
- }
public void onException(IOException error) {
transportListener.onException(error);
}
- public void transportInterupted() {
- transportListener.transportInterupted();
+ public void onDisconnected() {
+ transportListener.onDisconnected();
}
- public void transportResumed() {
- transportListener.transportResumed();
+ public void onConnected() {
+ transportListener.onConnected();
}
public <T> T narrow(Class<T> target) {
@@ -137,15 +154,16 @@ public class TransportFilter implements
return next.isConnected();
}
- public void reconnect(URI uri) throws IOException {
- next.reconnect(uri);
- }
-
- public boolean isUseInactivityMonitor() {
- return next.isUseInactivityMonitor();
+ public void reconnect(URI uri, CompletionCallback callback) {
+ next.reconnect(uri, callback);
}
public WireFormat getWireformat() {
return next.getWireformat();
}
+ public void setWireformat(WireFormat wireformat) {
+ next.setWireformat(wireformat);
+ }
+
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java Wed Jul 7 03:24:02 2010
@@ -37,16 +37,14 @@ public interface TransportListener {
void onException(IOException error);
/**
- * The transport has suffered an interuption from which it hopes to recover
- *
+ * The transport has been connected.
*/
- void transportInterupted();
-
-
+ public void onConnected();
+
/**
- * The transport has resumed after an interuption
- *
+ * The transport has suffered a disconnection from
+ * which it hopes to recover
*/
- void transportResumed();
-
+ public void onDisconnected();
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java Wed Jul 7 03:24:02 2010
@@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.activemq.Service;
+import org.fusesource.hawtdispatch.DispatchQueue;
/**
* A TransportServer asynchronously accepts {@see Transport} objects and then
@@ -46,4 +47,28 @@ public interface TransportServer extends
*/
InetSocketAddress getSocketAddress();
+ /**
+ * Returns the dispatch queue used by the transport
+ *
+ * @return
+ */
+ DispatchQueue getDispatchQueue();
+
+ /**
+ * Sets the dispatch queue used by the transport
+ *
+ * @param queue
+ */
+ void setDispatchQueue(DispatchQueue queue);
+
+ /**
+ * suspend accepting new transports
+ */
+ void suspend();
+
+ /**
+ * resume accepting new transports
+ */
+ void resume();
+
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul 7 03:24:02 2010
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.pipe;
+
+import org.apache.activemq.transport.CompletionCallback;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.CustomDispatchSource;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.EventAggregators;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PipeTransport implements Transport {
+ static private final Object EOF_TOKEN = new Object();
+
+ final private PipeTransportServer server;
+ PipeTransport peer;
+ private TransportListener listener;
+ private String remoteAddress;
+ private AtomicBoolean stopping = new AtomicBoolean();
+ private String name;
+ private WireFormat wireformat;
+ private boolean marshal;
+ private boolean trace;
+
+ private DispatchQueue dispatchQueue;
+ private CustomDispatchSource<Object,LinkedList<Object>> dispatchSource;
+ private boolean connected;
+
+ public PipeTransport(PipeTransportServer server) {
+ this.server = server;
+ }
+
+ public DispatchQueue getDispatchQueue() {
+ return dispatchQueue;
+ }
+ public void setDispatchQueue(DispatchQueue queue) {
+ if( dispatchQueue!=null ) {
+ dispatchQueue.release();
+ }
+ this.dispatchQueue = queue;
+ if( dispatchQueue!=null ) {
+ dispatchQueue.retain();
+ }
+ }
+
+ public void start() throws Exception {
+ if (dispatchQueue == null) {
+ throw new IllegalArgumentException("dispatchQueue is not set");
+ }
+ server.dispatchQueue.dispatchAsync(new Runnable(){
+ public void run() {
+ dispatchSource = Dispatch.createSource(EventAggregators.linkedList(), dispatchQueue);
+ dispatchSource.setEventHandler(new Runnable(){
+ public void run() {
+ try {
+ final LinkedList<Object> commands = dispatchSource.getData();
+ for (Object o : commands) {
+
+ if(o == EOF_TOKEN) {
+ throw new EOFException();
+ }
+
+ if (wireformat != null && marshal) {
+ listener.onCommand(wireformat.unmarshal((Buffer) o));
+ } else {
+ listener.onCommand(o);
+ }
+ }
+
+ // let the peer know that they have been processed.
+ peer.dispatchQueue.dispatchAsync(new Runnable() {
+ public void run() {
+ outbound -= commands.size();
+ drainInbound();
+ }
+ });
+ } catch (IOException e) {
+ listener.onException(e);
+ }
+
+ }
+ });
+ if( peer.dispatchSource != null ) {
+ fireConnected();
+ peer.fireConnected();
+ }
+ }
+ });
+ }
+
+ private void fireConnected() {
+ dispatchQueue.dispatchAsync(new Runnable() {
+ public void run() {
+ connected = true;
+ dispatchSource.resume();
+ listener.onConnected();
+ drainInbound();
+ }
+ });
+ }
+
+ public void stop() throws Exception {
+ if( connected ) {
+ peer.dispatchSource.merge(EOF_TOKEN);
+ }
+ if( dispatchSource!=null ) {
+ dispatchSource.release();
+ dispatchSource = null;
+ }
+ setDispatchQueue(null);
+ }
+
+ static final class OneWay {
+ final Object command;
+ final CompletionCallback callback;
+
+ public OneWay(Object command, CompletionCallback callback) {
+ this.callback = callback;
+ this.command = command;
+ }
+ }
+
+ final LinkedList<OneWay> inbound = new LinkedList<OneWay>();
+ int outbound = 0;
+ int maxOutbound = 100;
+
+ @Deprecated
+ public void oneway(Object command) {
+ oneway(command, null);
+ }
+
+ public void oneway(Object command, CompletionCallback callback) {
+ if( !connected ) {
+ throw new IllegalStateException("Not connected.");
+ }
+ if( outbound < maxOutbound ) {
+ transmit(command, callback);
+ } else {
+ inbound.add(new OneWay(command, callback));
+ }
+ }
+
+ private void drainInbound() {
+ while( outbound < maxOutbound && !inbound.isEmpty() ) {
+ OneWay oneWay = inbound.poll();
+ transmit(oneWay.command, oneWay.callback);
+ }
+ }
+
+ private void transmit(Object command, CompletionCallback callback) {
+ outbound++;
+ peer.dispatchSource.merge(command);
+ if( callback!=null ) {
+ callback.onCompletion();
+ }
+ }
+
+ public String getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ public <T> T narrow(Class<T> target) {
+ if (target.isAssignableFrom(getClass())) {
+ return target.cast(this);
+ }
+ return null;
+ }
+
+ public void suspend() {
+ dispatchSource.suspend();
+ }
+
+ public void resume() {
+ dispatchSource.resume();
+ }
+ public void reconnect(URI uri, CompletionCallback callback) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setRemoteAddress(String remoteAddress) {
+ this.remoteAddress = remoteAddress;
+ if (name == null) {
+ name = remoteAddress;
+ }
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public TransportListener getTransportListener() {
+ return listener;
+ }
+ public void setTransportListener(TransportListener listener) {
+ this.listener = listener;
+ }
+
+ public WireFormat getWireformat() {
+ return wireformat;
+ }
+ public void setWireformat(WireFormat wireformat) {
+ this.wireformat = wireformat;
+ }
+
+
+ public boolean isTrace() {
+ return trace;
+ }
+
+ public void setTrace(boolean trace) {
+ this.trace = trace;
+ }
+
+ public boolean isMarshal() {
+ return marshal;
+ }
+ public void setMarshal(boolean marshall) {
+ this.marshal = marshall;
+ }
+
+ public boolean isConnected() {
+ return !stopping.get();
+ }
+ public boolean isDisposed() {
+ return false;
+ }
+ public boolean isFaultTolerant() {
+ return false;
+ }
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Wed Jul 7 03:24:02 2010
@@ -1,378 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.transport.pipe;
-import java.io.EOFException;
import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
-import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.FutureResponse;
-import org.apache.activemq.transport.ResponseCallback;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.pipe.Pipe.ReadReadyListener;
-import org.apache.activemq.util.IOExceptionSupport;
+
+import static org.apache.activemq.transport.TransportFactorySupport.*;
+import org.apache.activemq.transport.*;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
-import org.apache.activemq.util.buffer.Buffer;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.wireformat.WireFormatFactory;
-
-import static org.apache.activemq.dispatch.DispatchOption.*;
-public class PipeTransportFactory extends TransportFactory {
- static private final Object EOF_TOKEN = new Object();
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PipeTransportFactory implements TransportFactory.TransportFactorySPI {
static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
- protected static class PipeTransport implements DispatchableTransport, Runnable, ReadReadyListener<Object> {
-
- private final Pipe<Object> pipe;
- private TransportListener listener;
- private String remoteAddress;
- private AtomicBoolean stopping = new AtomicBoolean();
- private Thread thread;
- private String name;
- private WireFormat wireFormat;
- private boolean marshal;
- private boolean trace;
- private DispatchQueue dispatchQueue;
- private Runnable dispatchTask;
-
- public PipeTransport(Pipe<Object> pipe) {
- this.pipe = pipe;
- }
-
- public void start() throws Exception {
- if (dispatchQueue != null) {
- pipe.setMode(Pipe.ASYNC);
- dispatchQueue.dispatchAsync(dispatchTask);
- } else {
- thread = new Thread(this, getRemoteAddress());
- thread.start();
- }
- }
-
- public void stop() throws Exception {
- pipe.write(EOF_TOKEN);
- if (dispatchQueue != null) {
- RunnableCountDownLatch done = new RunnableCountDownLatch(1);
- dispatchQueue.addShutdownWatcher(done);
- dispatchQueue.release();
- done.await();
- } else {
- stopping.set(true);
- if( thread!=null ) {
- thread.join();
- }
- }
- }
-
- public void setDispatcher(Dispatcher dispatcher) {
- dispatchQueue = dispatcher.createSerialQueue(name, STICK_TO_CALLER_THREAD);
- dispatchTask = new Runnable(){
- public void run() {
- dispatch();
- }
- };
- }
-
- public void onReadReady(Pipe<Object> pipe) {
- if (dispatchQueue != null) {
- dispatchQueue.dispatchAsync(dispatchTask);
- }
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public void oneway(Object command) throws IOException {
-
- try {
- if (wireFormat != null && marshal) {
- pipe.write(wireFormat.marshal(command));
- } else {
- pipe.write(command);
- }
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- /*
- * try { while( !stopping.get() ) { if( pipe.offer(command, 500,
- * TimeUnit.MILLISECONDS) ) { break; } } } catch
- * (InterruptedException e) { throw new InterruptedIOException(); }
- */
- }
+ public TransportServer bind(URI uri) throws URISyntaxException, IOException {
- public void dispatch() {
- while (true) {
- try {
- Object o = pipe.poll();
- if (o == null) {
- pipe.setReadReadyListener(this);
- return;
- } else {
- if(o == EOF_TOKEN) {
- throw new EOFException();
- }
- if (wireFormat != null && marshal) {
- listener.onCommand(wireFormat.unmarshal((Buffer) o));
- } else {
- listener.onCommand(o);
- }
- dispatchQueue.dispatchAsync(dispatchTask);
- return;
- }
- } catch (IOException e) {
- listener.onException(e);
- }
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+ String node = uri.getHost();
+ synchronized(servers) {
+ if (servers.containsKey(node)) {
+ throw new IOException("Server already bound: " + node);
}
- }
+ PipeTransportServer server = new PipeTransportServer();
+ server.setConnectURI(uri);
+ server.setName(node);
+ server.setWireFormatFactory(TransportFactorySupport.createWireFormatFactory(options));
+ IntrospectionSupport.setProperties(server, options);
- public void run() {
-
- try {
- while (!stopping.get()) {
- Object value = pipe.poll(500, TimeUnit.MILLISECONDS);
- if (value != null) {
- if(value == EOF_TOKEN) {
- throw new EOFException();
- } else {
- if (wireFormat != null && marshal) {
- listener.onCommand(wireFormat.unmarshal((Buffer)value));
- } else {
- listener.onCommand(value);
- }
- }
- }
- }
- } catch (IOException e) {
- listener.onException(e);
- } catch (InterruptedException e) {
+ if (!options.isEmpty()) {
+ throw new IllegalArgumentException("Invalid bind parameters: " + options);
}
+ servers.put(node, server);
+ return server;
}
- public String getRemoteAddress() {
- return remoteAddress;
- }
-
- public TransportListener getTransportListener() {
- return listener;
- }
-
- public boolean isConnected() {
- return !stopping.get();
- }
-
- public boolean isDisposed() {
- return false;
- }
-
- public boolean isFaultTolerant() {
- return false;
- }
-
- public boolean isUseInactivityMonitor() {
- return false;
- }
-
- public <T> T narrow(Class<T> target) {
- if (target.isAssignableFrom(getClass())) {
- return target.cast(this);
- }
- return null;
- }
-
- public void reconnect(URI uri) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public Object request(Object command) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public Object request(Object command, int timeout) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public void setTransportListener(TransportListener listener) {
- this.listener = listener;
- }
-
- public void setRemoteAddress(String remoteAddress) {
- this.remoteAddress = remoteAddress;
- if (name == null) {
- name = remoteAddress;
- }
- }
-
- public void setWireFormat(WireFormat wireFormat) {
- this.wireFormat = wireFormat;
- }
-
- public void setDispatchPriority(DispatchPriority priority) {
-// TODO:
-// readContext.updatePriority(priority);
- }
-
- public WireFormat getWireformat() {
- return wireFormat;
- }
-
- public boolean isTrace() {
- return trace;
- }
-
- public void setTrace(boolean trace) {
- this.trace = trace;
- }
-
- public boolean isMarshal() {
- return marshal;
- }
-
- public void setMarshal(boolean marshall) {
- this.marshal = marshall;
- }
}
- protected class PipeTransportServer implements TransportServer {
- protected URI connectURI;
- protected TransportAcceptListener listener;
- protected String name;
- protected WireFormatFactory wireFormatFactory;
- protected boolean marshal;
- protected final AtomicInteger connectionCounter = new AtomicInteger();
-
- public URI getConnectURI() {
- return connectURI;
- }
-
- public InetSocketAddress getSocketAddress() {
- return null;
- }
-
- public void setAcceptListener(TransportAcceptListener listener) {
- this.listener = listener;
- }
-
- public void start() throws Exception {
- }
-
- public void stop() throws Exception {
- unbind(this);
- }
-
- public void setConnectURI(URI connectURI) {
- this.connectURI = connectURI;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- public PipeTransport connect() {
- int connectionId = connectionCounter.incrementAndGet();
- String remoteAddress = connectURI.toString() + "#" + connectionId;
- assert listener != null : "Server does not have an accept listener";
- Pipe<Object> pipe = new Pipe<Object>();
- PipeTransport rc = createClientTransport(pipe);
- rc.setRemoteAddress(remoteAddress);
- PipeTransport serverSide = cerateServerTransport(pipe);
- serverSide.setMarshal(marshal);
- serverSide.setRemoteAddress(remoteAddress);
- if (wireFormatFactory != null) {
- rc.setWireFormat(wireFormatFactory.createWireFormat());
- serverSide.setWireFormat(wireFormatFactory.createWireFormat());
+ public Transport connect(URI location) throws IOException, URISyntaxException {
+ String name = location.getHost();
+ synchronized(servers) {
+ PipeTransportServer server = lookup(name);
+ if (server == null) {
+ throw new IOException("Server is not bound: " + name);
}
- listener.onAccept(serverSide);
- return rc;
- }
-
- protected PipeTransport createClientTransport(Pipe<Object> pipe) {
- return new PipeTransport(pipe);
- }
-
- protected PipeTransport cerateServerTransport(Pipe<Object> pipe) {
- return new PipeTransport(pipe.connect());
- }
-
- public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
- this.wireFormatFactory = wireFormatFactory;
- }
+ PipeTransport transport = server.connect();
- public boolean isMarshal() {
- return marshal;
- }
-
- public void setMarshal(boolean marshal) {
- this.marshal = marshal;
- }
- }
-
- @Override
- public TransportServer doBind(URI uri) throws IOException {
- try {
- Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
- String node = uri.getHost();
- synchronized(servers) {
- if (servers.containsKey(node)) {
- throw new IOException("Server already bound: " + node);
- }
- PipeTransportServer server = createTransportServer();
- server.setConnectURI(uri);
- server.setName(node);
- server.setWireFormatFactory(createWireFormatFactory(options));
- IntrospectionSupport.setProperties(server, options);
-
- if (!options.isEmpty()) {
- throw new IllegalArgumentException("Invalid bind parameters: " + options);
- }
-
-
- servers.put(node, server);
- return server;
- }
- } catch (URISyntaxException e) {
- throw IOExceptionSupport.create(e);
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+ return verify( configure(transport, options), options);
}
}
- protected PipeTransportServer createTransportServer() {
- return new PipeTransportServer();
- }
-
-
static public PipeTransportServer lookup(String name) {
synchronized(servers) {
return servers.get(name);
}
}
-
+
static public Map<String, PipeTransportServer> getServers() {
synchronized(servers) {
return new HashMap<String, PipeTransportServer>(servers);
@@ -384,47 +89,4 @@ public class PipeTransportFactory extend
servers.remove(server.getName());
}
}
-
- public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-
- PipeTransport pipeTransport = (PipeTransport)transport.narrow(PipeTransport.class);
- IntrospectionSupport.setProperties(pipeTransport, options);
-
- if (pipeTransport.isTrace()) {
- throw new UnsupportedOperationException("Trace not implemented");
-// try {
-// transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, pipeTransport.getLogWriterName(),
-// pipeTransport.isDynamicManagement(), pipeTransport.isStartLogging(), pipeTransport.getJmxPort());
-// } catch (Throwable e) {
-// LOG.error("Could not create TransportLogger object for: " + pipeTransport.getLogWriterName() + ", reason: " + e, e);
-// }
- }
-
- transport = format.createTransportFilters(transport, options);
- return transport;
- }
-
- protected String getOption(Map options, String key, String def) {
- String rc = (String) options.remove(key);
- if( rc == null ) {
- rc = def;
- }
- return rc;
- }
-
- protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
- String name = location.getHost();
- synchronized(servers) {
- PipeTransportServer server = lookup(name);
- if (server == null) {
- throw new IOException("Server is not bound: " + name);
- }
- PipeTransport transport = server.connect();
- transport.setWireFormat(wf);
- return transport;
- }
- }
-
-
-
}