You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/30 21:39:27 UTC
svn commit: r780312 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/transport/
activemq-broker/src/main/java/org/apache/activemq/wireformat/
activemq-broker/src/main/resources/META-INF/services/org/apache/activem...
Author: chirino
Date: Sat May 30 19:39:26 2009
New Revision: 780312
URL: http://svn.apache.org/viewvc?rev=780312&view=rev
Log:
removing dependencies on activemq-core
Added:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
- copied unchanged from r780233, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
- copied unchanged from r780233, activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java (with props)
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java (with props)
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java (with props)
Removed:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/
activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/broker/store/
activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/default
activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/openwire
activemq/sandbox/activemq-flow/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
Modified:
activemq/sandbox/activemq-flow/activemq-util/pom.xml
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java Sat May 30 19:39:26 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class FutureResponse<T> {
+ private static final Log LOG = LogFactory.getLog(FutureResponse.class);
+
+ private final ResponseCallback<T> responseCallback;
+ private final ArrayBlockingQueue<T> responseSlot = new ArrayBlockingQueue<T>(1);
+
+ public FutureResponse(ResponseCallback<T> responseCallback) {
+ this.responseCallback = responseCallback;
+ }
+
+ public T getResult() throws IOException {
+ try {
+ return responseSlot.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Operation interupted: " + e, e);
+ }
+ throw new InterruptedIOException("Interrupted.");
+ }
+ }
+
+ public T getResult(int timeout) throws IOException {
+ try {
+ return responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted.");
+ }
+ }
+
+ public void set(T result) {
+ if (responseSlot.offer(result)) {
+ if (responseCallback != null) {
+ responseCallback.onCompletion(this);
+ }
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/FutureResponse.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * @version $Revision$
+ */
+public class MutexTransport extends TransportFilter {
+
+ private final Object writeMutex = new Object();
+
+ public MutexTransport(Transport next) {
+ super(next);
+ }
+
+ public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+ synchronized (writeMutex) {
+ return next.asyncRequest(command, null);
+ }
+ }
+
+ public void oneway(Object command) throws IOException {
+ synchronized (writeMutex) {
+ next.oneway(command);
+ }
+ }
+
+ public Object request(Object command) throws IOException {
+ synchronized (writeMutex) {
+ return next.request(command);
+ }
+ }
+
+ public Object request(Object command, int timeout) throws IOException {
+ synchronized (writeMutex) {
+ return next.request(command, timeout);
+ }
+ }
+
+ public String toString() {
+ return next.toString();
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/MutexTransport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ResponseCallback.java Sat May 30 19:39:26 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+/**
+ * @version $Revision$
+ */
+public interface ResponseCallback<T> {
+ void onCompletion(FutureResponse<T> resp);
+}
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/ThreadNameFilter.java Sat May 30 19:39:26 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * The thread name filter, modifies the name of the thread during the invocation to a transport.
+ * It appends the remote address, so that a call stuck in a transport method such as socketWrite0
+ * will have the destination information in the thread name.
+ * This is extremely useful for thread dumps when debugging.
+ * To enable this transport, in the transport URI, simpley add<br/>
+ * <code>transport.threadName</code>.<br/>
+ * For example:</br>
+ * <pre><code>
+ * <transportConnector
+ * name="tcp1"
+ * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.threadName"
+ * />
+ * </code></pre>
+ * @author Filip Hanik
+ *
+ */
+public class ThreadNameFilter extends TransportFilter {
+
+ public ThreadNameFilter(Transport next) {
+ super(next);
+ }
+
+ @Override
+ public void oneway(Object command) throws IOException {
+ String address =(next!=null?next.getRemoteAddress():null);
+ if (address!=null) {
+ String name = Thread.currentThread().getName();
+ try {
+ String sendname = name + " - SendTo:"+address;
+ Thread.currentThread().setName(sendname);
+ super.oneway(command);
+ }finally {
+ Thread.currentThread().setName(name);
+ }
+ } else {
+ super.oneway(command);
+ }
+ }
+
+
+
+}
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.Service;
+
+/**
+ * Represents the client side of a transport allowing messages to be sent
+ * synchronously, asynchronously and consumed.
+ *
+ * @version $Revision: 1.5 $
+ */
+public interface Transport extends Service {
+
+ /**
+ * A one way asynchronous send
+ *
+ * @param command
+ * @throws IOException
+ */
+ void oneway(Object command) throws IOException;
+
+ /**
+ * An asynchronous request response where the Receipt will be returned in
+ * the future. If responseCallback is not null, then it will be called when
+ * the response has been completed.
+ *
+ * @param command
+ * @param responseCallback TODO
+ * @return the FutureResponse
+ * @throws IOException
+ */
+ <T> FutureResponse<T> asyncRequest(Object command, ResponseCallback<T> responseCallback) throws IOException;
+
+ /**
+ * A synchronous request response
+ *
+ * @param command
+ * @return the response
+ * @throws IOException
+ */
+ Object request(Object command) throws IOException;
+
+ /**
+ * A synchronous request response
+ *
+ * @param command
+ * @param timeout
+ * @return the repsonse or null if timeout
+ * @throws IOException
+ */
+ Object request(Object command, int timeout) throws IOException;
+
+ // /**
+ // * A one way asynchronous send
+ // * @param command
+ // * @throws IOException
+ // */
+ // void oneway(Command command) throws IOException;
+ //
+ // /**
+ // * An asynchronous request response where the Receipt will be returned
+ // * in the future. If responseCallback is not null, then it will be called
+ // * when the response has been completed.
+ // *
+ // * @param command
+ // * @param responseCallback TODO
+ // * @return the FutureResponse
+ // * @throws IOException
+ // */
+ // FutureResponse asyncRequest(Command command, ResponseCallback
+ // responseCallback) throws IOException;
+ //
+ // /**
+ // * A synchronous request response
+ // * @param command
+ // * @return the response
+ // * @throws IOException
+ // */
+ // Response request(Command command) throws IOException;
+ //
+ // /**
+ // * A synchronous request response
+ // * @param command
+ // * @param timeout
+ // * @return the repsonse or null if timeout
+ // * @throws IOException
+ // */
+ // Response request(Command command, int timeout) throws IOException;
+
+ /**
+ * Returns the current transport listener
+ *
+ * @return
+ */
+ TransportListener getTransportListener();
+
+ /**
+ * Registers an inbound command listener
+ *
+ * @param commandListener
+ */
+ void setTransportListener(TransportListener commandListener);
+
+ /**
+ * @param target
+ * @return the target
+ */
+ <T> T narrow(Class<T> target);
+
+ /**
+ * @return the remote address for this connection
+ */
+ String getRemoteAddress();
+
+ /**
+ * Indicates if the transport can handle faults
+ *
+ * @return true if fault tolerant
+ */
+ boolean isFaultTolerant();
+
+ /**
+ * @return true if the transport is disposed
+ */
+ boolean isDisposed();
+
+ /**
+ * @return true if the transport is connected
+ */
+ boolean isConnected();
+
+ /**
+ * reconnect to another location
+ * @param uri
+ * @throws IOException on failure of if not supported
+ */
+ void reconnect(URI uri) throws IOException;
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/Transport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java Sat May 30 19:39:26 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+
+public interface TransportAcceptListener {
+
+ void onAccept(Transport transport);
+
+ void onAcceptError(Exception error);
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportAcceptListener.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java Sat May 30 19:39:26 2009
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public abstract class TransportFactory {
+
+ private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
+ private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
+ private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
+
+ private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
+ private static final String THREAD_NAME_FILTER = "threadName";
+
+ public abstract TransportServer doBind(URI location) throws IOException;
+
+ public Transport doConnect(URI location, Executor ex) throws Exception {
+ return doConnect(location);
+ }
+
+ public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
+ return doCompositeConnect(location);
+ }
+
+ /**
+ * Creates a normal transport.
+ *
+ * @param location
+ * @return the transport
+ * @throws Exception
+ */
+ public static Transport connect(URI location) throws Exception {
+ TransportFactory tf = findTransportFactory(location);
+ return tf.doConnect(location);
+ }
+
+ /**
+ * Creates a normal transport.
+ *
+ * @param location
+ * @param ex
+ * @return the transport
+ * @throws Exception
+ */
+ public static Transport connect(URI location, Executor ex) throws Exception {
+ TransportFactory tf = findTransportFactory(location);
+ return tf.doConnect(location, ex);
+ }
+
+ /**
+ * Creates a slimmed down transport that is more efficient so that it can be
+ * used by composite transports like reliable and HA.
+ *
+ * @param location
+ * @return the Transport
+ * @throws Exception
+ */
+ public static Transport compositeConnect(URI location) throws Exception {
+ TransportFactory tf = findTransportFactory(location);
+ return tf.doCompositeConnect(location);
+ }
+
+ /**
+ * Creates a slimmed down transport that is more efficient so that it can be
+ * used by composite transports like reliable and HA.
+ *
+ * @param location
+ * @param ex
+ * @return the Transport
+ * @throws Exception
+ */
+ public static Transport compositeConnect(URI location, Executor ex) throws Exception {
+ TransportFactory tf = findTransportFactory(location);
+ return tf.doCompositeConnect(location, ex);
+ }
+
+ public static TransportServer bind(URI location) throws IOException {
+ TransportFactory tf = findTransportFactory(location);
+ return tf.doBind(location);
+ }
+
+ /**
+ * @deprecated
+ */
+ public static TransportServer bind(String brokerId, URI location) throws IOException {
+ return bind(location);
+ }
+
+// public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
+// TransportFactory tf = findTransportFactory(location);
+// if( brokerService!=null && tf instanceof BrokerServiceAware ) {
+// ((BrokerServiceAware)tf).setBrokerService(brokerService);
+// }
+// try {
+// if( brokerService!=null ) {
+// SslContext.setCurrentSslContext(brokerService.getSslContext());
+// }
+// return tf.doBind(location);
+// } finally {
+// SslContext.setCurrentSslContext(null);
+// }
+// }
+
+ public Transport doConnect(URI location) throws Exception {
+ try {
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+ WireFormat wf = createWireFormat(options);
+ Transport transport = createTransport(location, wf);
+ Transport rc = configure(transport, wf, options);
+ if (!options.isEmpty()) {
+ throw new IllegalArgumentException("Invalid connect parameters: " + options);
+ }
+ return rc;
+ } catch (URISyntaxException e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ public Transport doCompositeConnect(URI location) throws Exception {
+ try {
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+ WireFormat wf = createWireFormat(options);
+ Transport transport = createTransport(location, wf);
+ Transport rc = compositeConfigure(transport, wf, options);
+ if (!options.isEmpty()) {
+ throw new IllegalArgumentException("Invalid connect parameters: " + options);
+ }
+ return rc;
+
+ } catch (URISyntaxException e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ /**
+ * Allow registration of a transport factory without wiring via META-INF classes
+ * @param scheme
+ * @param tf
+ */
+ public static void registerTransportFactory(String scheme, TransportFactory tf) {
+ TRANSPORT_FACTORYS.put(scheme, tf);
+ }
+
+ /**
+ * Factory method to create a new transport
+ *
+ * @throws IOException
+ * @throws UnknownHostException
+ */
+ protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
+ throw new IOException("createTransport() method not implemented!");
+ }
+
+ /**
+ * @param location
+ * @return
+ * @throws IOException
+ */
+ private static TransportFactory findTransportFactory(URI location) throws IOException {
+ String scheme = location.getScheme();
+ if (scheme == null) {
+ throw new IOException("Transport not scheme specified: [" + location + "]");
+ }
+ TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
+ if (tf == null) {
+ // Try to load if from a META-INF property.
+ try {
+ tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
+ TRANSPORT_FACTORYS.put(scheme, tf);
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
+ }
+ }
+ return tf;
+ }
+
+ protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
+ WireFormatFactory factory = createWireFormatFactory(options);
+ WireFormat format = factory.createWireFormat();
+ return format;
+ }
+
+ protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
+ String wireFormat = (String)options.remove("wireFormat");
+ if (wireFormat == null) {
+ wireFormat = getDefaultWireFormatType();
+ }
+
+ try {
+ WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
+ IntrospectionSupport.setProperties(wff, options, "wireFormat.");
+ return wff;
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+ }
+ }
+
+ protected String getDefaultWireFormatType() {
+ return "default";
+ }
+
+ /**
+ * Fully configures and adds all need transport filters so that the
+ * transport can be used by the JMS client.
+ *
+ * @param transport
+ * @param wf
+ * @param options
+ * @return
+ * @throws Exception
+ */
+ public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
+ transport = compositeConfigure(transport, wf, options);
+
+ transport = new MutexTransport(transport);
+ transport = new ResponseCorrelator(transport);
+
+ return transport;
+ }
+
+ /**
+ * Fully configures and adds all need transport filters so that the
+ * transport can be used by the ActiveMQ message broker. The main difference
+ * between this and the configure() method is that the broker does not issue
+ * requests to the client so the ResponseCorrelator is not needed.
+ *
+ * @param transport
+ * @param format
+ * @param options
+ * @return
+ * @throws Exception
+ */
+ public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+ if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
+ transport = new WriteTimeoutFilter(transport);
+ String soWriteTimeout = (String)options.get(WRITE_TIMEOUT_FILTER);
+ if (soWriteTimeout!=null) ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
+ }
+ if (options.containsKey(THREAD_NAME_FILTER)) {
+ transport = new ThreadNameFilter(transport);
+ }
+ transport = compositeConfigure(transport, format, options);
+ transport = new MutexTransport(transport);
+ return transport;
+ }
+
+ /**
+ * Similar to configure(...) but this avoid adding in the MutexTransport and
+ * ResponseCorrelator transport layers so that the resulting transport can
+ * more efficiently be used as part of a composite transport.
+ *
+ * @param transport
+ * @param format
+ * @param options
+ * @return
+ */
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ IntrospectionSupport.setProperties(transport, options);
+ return transport;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFactory.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java Sat May 30 19:39:26 2009
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.5 $
+ */
+public class TransportFilter implements TransportListener, Transport {
+ protected final Transport next;
+ protected TransportListener transportListener;
+
+ public TransportFilter(Transport next) {
+ this.next = next;
+ }
+
+ public TransportListener getTransportListener() {
+ return transportListener;
+ }
+
+ public void setTransportListener(TransportListener channelListener) {
+ this.transportListener = channelListener;
+ if (channelListener == null) {
+ next.setTransportListener(null);
+ } else {
+ next.setTransportListener(this);
+ }
+ }
+
+ /**
+ * @see org.apache.activemq.Service#start()
+ * @throws IOException if the next channel has not been set.
+ */
+ public void start() throws Exception {
+ if (next == null) {
+ throw new IOException("The next channel has not been set.");
+ }
+ if (transportListener == null) {
+ throw new IOException("The command listener has not been set.");
+ }
+ next.start();
+ }
+
+ /**
+ * @see org.apache.activemq.Service#stop()
+ */
+ public void stop() throws Exception {
+ next.stop();
+ }
+
+ public void onCommand(Object command) {
+ transportListener.onCommand(command);
+ }
+
+ /**
+ * @return Returns the next.
+ */
+ public Transport getNext() {
+ return next;
+ }
+
+ public String toString() {
+ return next.toString();
+ }
+
+ public void oneway(Object command) throws IOException {
+ next.oneway(command);
+ }
+
+ public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
+ return next.asyncRequest(command, null);
+ }
+
+ public Object request(Object command) throws IOException {
+ return next.request(command);
+ }
+
+ public Object request(Object command, int timeout) throws IOException {
+ return next.request(command, timeout);
+ }
+
+ public void onException(IOException error) {
+ transportListener.onException(error);
+ }
+
+ public void transportInterupted() {
+ transportListener.transportInterupted();
+ }
+
+ public void transportResumed() {
+ transportListener.transportResumed();
+ }
+
+ public <T> T narrow(Class<T> target) {
+ if (target.isAssignableFrom(getClass())) {
+ return target.cast(this);
+ }
+ return next.narrow(target);
+ }
+
+ public String getRemoteAddress() {
+ return next.getRemoteAddress();
+ }
+
+ /**
+ * @return
+ * @see org.apache.activemq.transport.Transport#isFaultTolerant()
+ */
+ public boolean isFaultTolerant() {
+ return next.isFaultTolerant();
+ }
+
+ public boolean isDisposed() {
+ return next.isDisposed();
+ }
+
+ public boolean isConnected() {
+ return next.isConnected();
+ }
+
+ public void reconnect(URI uri) throws IOException {
+ next.reconnect(uri);
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportFilter.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java Sat May 30 19:39:26 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+
+/**
+ * An asynchronous listener of commands
+ *
+ * @version $Revision$
+ */
+public interface TransportListener {
+
+ /**
+ * called to process a command
+ * @param command
+ */
+ void onCommand(Object command);
+ /**
+ * An unrecoverable exception has occured on the transport
+ * @param error
+ */
+ void onException(IOException error);
+
+ /**
+ * The transport has suffered an interuption from which it hopes to recover
+ *
+ */
+ void transportInterupted();
+
+
+ /**
+ * The transport has resumed after an interuption
+ *
+ */
+ void transportResumed();
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportListener.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java Sat May 30 19:39:26 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import org.apache.activemq.Service;
+
+/**
+ * A TransportServer asynchronously accepts {@see Transport} objects and then
+ * delivers those objects to a {@see TransportAcceptListener}.
+ *
+ * @version $Revision: 1.4 $
+ */
+public interface TransportServer extends Service {
+
+ /**
+ * Registers an {@see TransportAcceptListener} which is notified of accepted
+ * channels.
+ *
+ * @param acceptListener
+ */
+ void setAcceptListener(TransportAcceptListener acceptListener);
+
+ URI getConnectURI();
+
+ /**
+ * @return The socket address that this transport is accepting connections
+ * on or null if this does not or is not currently accepting
+ * connections on a socket.
+ */
+ InetSocketAddress getSocketAddress();
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/transport/TransportServer.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Sat May 30 19:39:26 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
+
+/**
+ * A simple implementation which uses Object Stream serialization.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ObjectStreamWireFormat implements WireFormat {
+
+ public ByteSequence marshal(Object command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream ds = new DataOutputStream(baos);
+ marshal(command, ds);
+ ds.close();
+ return baos.toByteSequence();
+ }
+
+ public Object unmarshal(ByteSequence packet) throws IOException {
+ return unmarshal(new DataInputStream(new ByteArrayInputStream(packet)));
+ }
+
+ public void marshal(Object command, DataOutput ds) throws IOException {
+ ObjectOutputStream out = new ObjectOutputStream((OutputStream)ds);
+ out.writeObject(command);
+ out.flush();
+ out.reset();
+ }
+
+ public Object unmarshal(DataInput ds) throws IOException {
+ try {
+ ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream((InputStream)ds);
+ Object command;
+ command = in.readObject();
+ in.close();
+ return command;
+ } catch (ClassNotFoundException e) {
+ throw (IOException)new IOException("unmarshal failed: " + e).initCause(e);
+ }
+ }
+
+ public void setVersion(int version) {
+ }
+
+ public int getVersion() {
+ return 0;
+ }
+
+ public boolean inReceive() {
+ // TODO implement the inactivity monitor
+ return false;
+ }
+
+
+
+}
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormat.java Sat May 30 19:39:26 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.util.ByteSequence;
+
+
+/**
+ * Provides a mechanism to marshal commands into and out of packets
+ * or into and out of streams, Channels and Datagrams.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface WireFormat {
+
+ /**
+ * Packet based marshaling
+ */
+ ByteSequence marshal(Object command) throws IOException;
+
+ /**
+ * Packet based un-marshaling
+ */
+ Object unmarshal(ByteSequence packet) throws IOException;
+
+ /**
+ * Stream based marshaling
+ */
+ void marshal(Object command, DataOutput out) throws IOException;
+
+ /**
+ * Packet based un-marshaling
+ */
+ Object unmarshal(DataInput in) throws IOException;
+
+ /**
+ * @param the version of the wire format
+ */
+ void setVersion(int version);
+
+ /**
+ * @return the version of the wire format
+ */
+ int getVersion();
+
+ /**
+ * @return true if message is being received
+ */
+ boolean inReceive();
+
+}
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java Sat May 30 19:39:26 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.wireformat;
+
+public interface WireFormatFactory {
+ WireFormat createWireFormat();
+}
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/wireformat/package.html Sat May 30 19:39:26 2009
@@ -0,0 +1,27 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+An API for WireFormats which are used to turn object into bytes and bytes into objects.
+</p>
+
+</body>
+</html>
Modified: activemq/sandbox/activemq-flow/activemq-util/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/pom.xml?rev=780312&r1=780311&r2=780312&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-util/pom.xml Sat May 30 19:39:26 2009
@@ -32,7 +32,7 @@
<name>ActiveMQ :: Util</name>
<dependencies>
-
+
<!-- Testing Dependencies -->
<dependency>
<groupId>junit</groupId>
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java Sat May 30 19:39:26 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+
+public class ClassLoadingAwareObjectInputStream extends ObjectInputStream {
+
+ private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader();
+ /** <p>Maps primitive type names to corresponding class objects.</p> */
+ private static final HashMap<String, Class> primClasses = new HashMap<String, Class>(8, 1.0F);
+ public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException {
+ super(in);
+ }
+
+ protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ return load(classDesc.getName(), cl);
+ }
+
+ protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ Class[] cinterfaces = new Class[interfaces.length];
+ for (int i = 0; i < interfaces.length; i++) {
+ cinterfaces[i] = load(interfaces[i], cl);
+ }
+
+ try {
+ return Proxy.getProxyClass(cinterfaces[0].getClassLoader(), cinterfaces);
+ } catch (IllegalArgumentException e) {
+ throw new ClassNotFoundException(null, e);
+ }
+ }
+
+ private Class load(String className, ClassLoader cl)
+ throws ClassNotFoundException {
+ try {
+ return Class.forName(className, false, cl);
+ } catch (ClassNotFoundException e) {
+ final Class clazz = (Class) primClasses.get(className);
+ if (clazz != null) {
+ return clazz;
+ } else {
+ return Class.forName(className, false, FALLBACK_CLASS_LOADER);
+ }
+ }
+ }
+
+
+
+ static {
+ primClasses.put("boolean", boolean.class);
+ primClasses.put("byte", byte.class);
+ primClasses.put("char", char.class);
+ primClasses.put("short", short.class);
+ primClasses.put("int", int.class);
+ primClasses.put("long", long.class);
+ primClasses.put("float", float.class);
+ primClasses.put("double", double.class);
+ primClasses.put("void", void.class);
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.IOException;
+
+public final class IOExceptionSupport {
+
+ private IOExceptionSupport() {
+ }
+
+ public static IOException create(String msg, Throwable cause) {
+ IOException exception = new IOException(msg);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static IOException create(String msg, Exception cause) {
+ IOException exception = new IOException(msg);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static IOException create(Throwable cause) {
+ IOException exception = new IOException(cause.getMessage());
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static IOException create(Exception cause) {
+ IOException exception = new IOException(cause.getMessage());
+ exception.initCause(cause);
+ return exception;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOExceptionSupport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntSequenceGenerator.java Sat May 30 19:39:26 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+public class IntSequenceGenerator {
+
+ private int lastSequenceId;
+
+ public synchronized int getNextSequenceId() {
+ return ++lastSequenceId;
+ }
+
+ public synchronized int getLastSequenceId() {
+ return lastSequenceId;
+ }
+
+ public synchronized void setLastSequenceId(int l) {
+ lastSequenceId = l;
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.beans.PropertyEditor;
+import java.beans.PropertyEditorManager;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+
+
+
+public final class IntrospectionSupport {
+
+ static {
+ // Add Spring and ActiveMQ specific property editors
+ String[] additionalPath = new String[] {
+ "org.springframework.beans.propertyeditors",
+ "org.apache.activemq.util" };
+
+ String[] searchPath = (String[]) Array.newInstance(String.class,
+ PropertyEditorManager.getEditorSearchPath().length
+ + additionalPath.length);
+ System.arraycopy(PropertyEditorManager.getEditorSearchPath(), 0,
+ searchPath, 0,
+ PropertyEditorManager.getEditorSearchPath().length);
+ System.arraycopy(additionalPath, 0, searchPath, PropertyEditorManager
+ .getEditorSearchPath().length, additionalPath.length);
+ PropertyEditorManager.setEditorSearchPath(searchPath);
+ }
+
+ private IntrospectionSupport() {
+ }
+
+ public static boolean getProperties(Object target, Map props, String optionPrefix) {
+
+ boolean rc = false;
+ if (target == null) {
+ throw new IllegalArgumentException("target was null.");
+ }
+ if (props == null) {
+ throw new IllegalArgumentException("props was null.");
+ }
+
+ if (optionPrefix == null) {
+ optionPrefix = "";
+ }
+
+ Class clazz = target.getClass();
+ Method[] methods = clazz.getMethods();
+ for (int i = 0; i < methods.length; i++) {
+ Method method = methods[i];
+ String name = method.getName();
+ Class type = method.getReturnType();
+ Class params[] = method.getParameterTypes();
+ if ((name.startsWith("is") || name.startsWith("get")) && params.length == 0 && type != null && isSettableType(type)) {
+
+ try {
+
+ Object value = method.invoke(target, new Object[] {});
+ if (value == null) {
+ continue;
+ }
+
+ String strValue = convertToString(value, type);
+ if (strValue == null) {
+ continue;
+ }
+ if (name.startsWith("get")) {
+ name = name.substring(3, 4).toLowerCase()
+ + name.substring(4);
+ } else {
+ name = name.substring(2, 3).toLowerCase()
+ + name.substring(3);
+ }
+ props.put(optionPrefix + name, strValue);
+ rc = true;
+
+ } catch (Throwable ignore) {
+ }
+
+ }
+ }
+
+ return rc;
+ }
+
+ public static boolean setProperties(Object target, Map<String, ?> props, String optionPrefix) {
+ boolean rc = false;
+ if (target == null) {
+ throw new IllegalArgumentException("target was null.");
+ }
+ if (props == null) {
+ throw new IllegalArgumentException("props was null.");
+ }
+
+ for (Iterator<String> iter = props.keySet().iterator(); iter.hasNext();) {
+ String name = iter.next();
+ if (name.startsWith(optionPrefix)) {
+ Object value = props.get(name);
+ name = name.substring(optionPrefix.length());
+ if (setProperty(target, name, value)) {
+ iter.remove();
+ rc = true;
+ }
+ }
+ }
+ return rc;
+ }
+
+ public static Map<String, Object> extractProperties(Map props, String optionPrefix) {
+ if (props == null) {
+ throw new IllegalArgumentException("props was null.");
+ }
+
+ HashMap<String, Object> rc = new HashMap<String, Object>(props.size());
+
+ for (Iterator iter = props.keySet().iterator(); iter.hasNext();) {
+ String name = (String)iter.next();
+ if (name.startsWith(optionPrefix)) {
+ Object value = props.get(name);
+ name = name.substring(optionPrefix.length());
+ rc.put(name, value);
+ iter.remove();
+ }
+ }
+
+ return rc;
+ }
+
+ public static boolean setProperties(Object target, Map props) {
+ boolean rc = false;
+
+ if (target == null) {
+ throw new IllegalArgumentException("target was null.");
+ }
+ if (props == null) {
+ throw new IllegalArgumentException("props was null.");
+ }
+
+ for (Iterator iter = props.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Entry)iter.next();
+ if (setProperty(target, (String)entry.getKey(), entry.getValue())) {
+ iter.remove();
+ rc = true;
+ }
+ }
+
+ return rc;
+ }
+
+ public static boolean setProperty(Object target, String name, Object value) {
+ try {
+ Class clazz = target.getClass();
+ Method setter = findSetterMethod(clazz, name);
+ if (setter == null) {
+ return false;
+ }
+
+ // If the type is null or it matches the needed type, just use the
+ // value directly
+ if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
+ setter.invoke(target, new Object[] {value});
+ } else {
+ // We need to convert it
+ setter.invoke(target, new Object[] {convert(value, setter.getParameterTypes()[0])});
+ }
+ return true;
+ } catch (Throwable ignore) {
+ return false;
+ }
+ }
+
+ private static Object convert(Object value, Class type) {
+ PropertyEditor editor = PropertyEditorManager.findEditor(type);
+ if (editor != null) {
+ editor.setAsText(value.toString());
+ return editor.getValue();
+ }
+ return null;
+ }
+
+ public static String convertToString(Object value, Class type) {
+ PropertyEditor editor = PropertyEditorManager.findEditor(type);
+ if (editor != null) {
+ editor.setValue(value);
+ return editor.getAsText();
+ }
+ return null;
+ }
+
+ private static Method findSetterMethod(Class clazz, String name) {
+ // Build the method name.
+ name = "set" + name.substring(0, 1).toUpperCase() + name.substring(1);
+ Method[] methods = clazz.getMethods();
+ for (int i = 0; i < methods.length; i++) {
+ Method method = methods[i];
+ Class params[] = method.getParameterTypes();
+ if (method.getName().equals(name) && params.length == 1 ) {
+ return method;
+ }
+ }
+ return null;
+ }
+
+ private static boolean isSettableType(Class clazz) {
+ if (PropertyEditorManager.findEditor(clazz) != null) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public static String toString(Object target) {
+ return toString(target, Object.class, null);
+ }
+
+ public static String toString(Object target, Class stopClass) {
+ return toString(target, stopClass, null);
+ }
+
+ public static String toString(Object target, Class stopClass, Map<String, Object> overrideFields) {
+ LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
+ addFields(target, target.getClass(), stopClass, map);
+ if (overrideFields != null) {
+ for(String key : overrideFields.keySet()) {
+ Object value = overrideFields.get(key);
+ map.put(key, value);
+ }
+
+ }
+ StringBuffer buffer = new StringBuffer(simpleName(target.getClass()));
+ buffer.append(" {");
+ Set entrySet = map.entrySet();
+ boolean first = true;
+ for (Iterator iter = entrySet.iterator(); iter.hasNext();) {
+ Map.Entry entry = (Map.Entry)iter.next();
+ if (first) {
+ first = false;
+ } else {
+ buffer.append(", ");
+ }
+ buffer.append(entry.getKey());
+ buffer.append(" = ");
+ appendToString(buffer, entry.getValue());
+ }
+ buffer.append("}");
+ return buffer.toString();
+ }
+
+ protected static void appendToString(StringBuffer buffer, Object value) {
+// if (value instanceof ActiveMQDestination) {
+// ActiveMQDestination destination = (ActiveMQDestination)value;
+// buffer.append(destination.getQualifiedName());
+// } else {
+ buffer.append(value);
+// }
+ }
+
+ public static String simpleName(Class clazz) {
+ String name = clazz.getName();
+ int p = name.lastIndexOf(".");
+ if (p >= 0) {
+ name = name.substring(p + 1);
+ }
+ return name;
+ }
+
+ private static void addFields(Object target, Class startClass, Class<Object> stopClass, LinkedHashMap<String, Object> map) {
+
+ if (startClass != stopClass) {
+ addFields(target, startClass.getSuperclass(), stopClass, map);
+ }
+
+ Field[] fields = startClass.getDeclaredFields();
+ for (int i = 0; i < fields.length; i++) {
+ Field field = fields[i];
+ if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())
+ || Modifier.isPrivate(field.getModifiers())) {
+ continue;
+ }
+
+ try {
+ field.setAccessible(true);
+ Object o = field.get(target);
+ if (o != null && o.getClass().isArray()) {
+ try {
+ o = Arrays.asList((Object[])o);
+ } catch (Throwable e) {
+ }
+ }
+ map.put(field.getName(), o);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java?rev=780312&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java Sat May 30 19:39:26 2009
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @version $Revision$
+ */
+public class URISupport {
+
+ public static class CompositeData {
+ private String host;
+ private String scheme;
+ private String path;
+ private URI components[];
+ private Map<String, String> parameters;
+ private String fragment;
+
+ public URI[] getComponents() {
+ return components;
+ }
+
+ public String getFragment() {
+ return fragment;
+ }
+
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public URI toURI() throws URISyntaxException {
+ StringBuffer sb = new StringBuffer();
+ if (scheme != null) {
+ sb.append(scheme);
+ sb.append(':');
+ }
+
+ if (host != null && host.length() != 0) {
+ sb.append(host);
+ } else {
+ sb.append('(');
+ for (int i = 0; i < components.length; i++) {
+ if (i != 0) {
+ sb.append(',');
+ }
+ sb.append(components[i].toString());
+ }
+ sb.append(')');
+ }
+
+ if (path != null) {
+ sb.append('/');
+ sb.append(path);
+ }
+ if (!parameters.isEmpty()) {
+ sb.append("?");
+ sb.append(createQueryString(parameters));
+ }
+ if (fragment != null) {
+ sb.append("#");
+ sb.append(fragment);
+ }
+ return new URI(sb.toString());
+ }
+ }
+
+ public static Map<String, String> parseQuery(String uri) throws URISyntaxException {
+ try {
+ Map<String, String> rc = new HashMap<String, String>();
+ if (uri != null) {
+ String[] parameters = uri.split("&");
+ for (int i = 0; i < parameters.length; i++) {
+ int p = parameters[i].indexOf("=");
+ if (p >= 0) {
+ String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
+ String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
+ rc.put(name, value);
+ } else {
+ rc.put(parameters[i], null);
+ }
+ }
+ }
+ return rc;
+ } catch (UnsupportedEncodingException e) {
+ throw (URISyntaxException)new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
+ }
+ }
+
+ public static Map<String, String> parseParamters(URI uri) throws URISyntaxException {
+ return uri.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(uri.getQuery(), "?"));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<String, String> emptyMap() {
+ return Collections.EMPTY_MAP;
+ }
+
+ /**
+ * Removes any URI query from the given uri
+ */
+ public static URI removeQuery(URI uri) throws URISyntaxException {
+ return createURIWithQuery(uri, null);
+ }
+
+ /**
+ * Creates a URI with the given query
+ */
+ public static URI createURIWithQuery(URI uri, String query) throws URISyntaxException {
+ return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(),
+ query, uri.getFragment());
+ }
+
+ public static CompositeData parseComposite(URI uri) throws URISyntaxException {
+
+ CompositeData rc = new CompositeData();
+ rc.scheme = uri.getScheme();
+ String ssp = stripPrefix(uri.getSchemeSpecificPart().trim(), "//").trim();
+
+ parseComposite(uri, rc, ssp);
+
+ rc.fragment = uri.getFragment();
+ return rc;
+ }
+
+ /**
+ * @param uri
+ * @param rc
+ * @param ssp
+ * @param p
+ * @throws URISyntaxException
+ */
+ private static void parseComposite(URI uri, CompositeData rc, String ssp) throws URISyntaxException {
+ String componentString;
+ String params;
+
+ if (!checkParenthesis(ssp)) {
+ throw new URISyntaxException(uri.toString(), "Not a matching number of '(' and ')' parenthesis");
+ }
+
+ int p;
+ int intialParen = ssp.indexOf("(");
+ if (intialParen == 0) {
+ rc.host = ssp.substring(0, intialParen);
+ p = rc.host.indexOf("/");
+ if (p >= 0) {
+ rc.path = rc.host.substring(p);
+ rc.host = rc.host.substring(0, p);
+ }
+ p = ssp.lastIndexOf(")");
+ componentString = ssp.substring(intialParen + 1, p);
+ params = ssp.substring(p + 1).trim();
+
+ } else {
+ componentString = ssp;
+ params = "";
+ }
+
+ String components[] = splitComponents(componentString);
+ rc.components = new URI[components.length];
+ for (int i = 0; i < components.length; i++) {
+ rc.components[i] = new URI(components[i].trim());
+ }
+
+ p = params.indexOf("?");
+ if (p >= 0) {
+ if (p > 0) {
+ rc.path = stripPrefix(params.substring(0, p), "/");
+ }
+ rc.parameters = parseQuery(params.substring(p + 1));
+ } else {
+ if (params.length() > 0) {
+ rc.path = stripPrefix(params, "/");
+ }
+ rc.parameters = emptyMap();
+ }
+ }
+
+ /**
+ * @param componentString
+ * @return
+ */
+ private static String[] splitComponents(String str) {
+ List<String> l = new ArrayList<String>();
+
+ int last = 0;
+ int depth = 0;
+ char chars[] = str.toCharArray();
+ for (int i = 0; i < chars.length; i++) {
+ switch (chars[i]) {
+ case '(':
+ depth++;
+ break;
+ case ')':
+ depth--;
+ break;
+ case ',':
+ if (depth == 0) {
+ String s = str.substring(last, i);
+ l.add(s);
+ last = i + 1;
+ }
+ break;
+ default:
+ }
+ }
+
+ String s = str.substring(last);
+ if (s.length() != 0) {
+ l.add(s);
+ }
+
+ String rc[] = new String[l.size()];
+ l.toArray(rc);
+ return rc;
+ }
+
+ public static String stripPrefix(String value, String prefix) {
+ if (value.startsWith(prefix)) {
+ return value.substring(prefix.length());
+ }
+ return value;
+ }
+
+ public static URI stripScheme(URI uri) throws URISyntaxException {
+ return new URI(stripPrefix(uri.getSchemeSpecificPart().trim(), "//"));
+ }
+
+ public static String createQueryString(Map options) throws URISyntaxException {
+ try {
+ if (options.size() > 0) {
+ StringBuffer rc = new StringBuffer();
+ boolean first = true;
+ for (Iterator iter = options.keySet().iterator(); iter.hasNext();) {
+ if (first) {
+ first = false;
+ } else {
+ rc.append("&");
+ }
+ String key = (String)iter.next();
+ String value = (String)options.get(key);
+ rc.append(URLEncoder.encode(key, "UTF-8"));
+ rc.append("=");
+ rc.append(URLEncoder.encode(value, "UTF-8"));
+ }
+ return rc.toString();
+ } else {
+ return "";
+ }
+ } catch (UnsupportedEncodingException e) {
+ throw (URISyntaxException)new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
+ }
+ }
+
+ /**
+ * Creates a URI from the original URI and the remaining paramaters
+ *
+ * @throws URISyntaxException
+ */
+ public static URI createRemainingURI(URI originalURI, Map params) throws URISyntaxException {
+ String s = createQueryString(params);
+ if (s.length() == 0) {
+ s = null;
+ }
+ return createURIWithQuery(originalURI, s);
+ }
+
+ public static URI changeScheme(URI bindAddr, String scheme) throws URISyntaxException {
+ return new URI(scheme, bindAddr.getUserInfo(), bindAddr.getHost(), bindAddr.getPort(), bindAddr
+ .getPath(), bindAddr.getQuery(), bindAddr.getFragment());
+ }
+
+ public static boolean checkParenthesis(String str) {
+ boolean result = true;
+ if (str != null) {
+ int open = 0;
+ int closed = 0;
+
+ int i = 0;
+ while ((i = str.indexOf('(', i)) >= 0) {
+ i++;
+ open++;
+ }
+ i = 0;
+ while ((i = str.indexOf(')', i)) >= 0) {
+ i++;
+ closed++;
+ }
+ result = open == closed;
+ }
+ return result;
+ }
+
+ public int indexOfParenthesisMatch(String str) {
+ int result = -1;
+
+ return result;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/URISupport.java
------------------------------------------------------------------------------
svn:executable = *