You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:44 UTC
[20/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
new file mode 100644
index 0000000..ee01add
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+
+/**
+ * Base class for the JMS object representing JMS resources such as Connection, Session, etc.
+ */
+public interface JmsResource {
+
+ /**
+ * Allows a visitor object to walk the resources and process them.
+ *
+ * @param visitor
+ * The visitor instance that is processing this resource.
+ *
+ * @throws Exception if an error occurs while visiting this resource.
+ */
+ void visit(JmsResourceVistor visitor) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
new file mode 100644
index 0000000..32ee783
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+/**
+ * Base for all Id type classes used in the JMS Framework
+ */
+public interface JmsResourceId {
+
+ /**
+ * Allows a Provider to embed a hint in this Id value for later use. The
+ * hint can allow the provider to more easier locate state data for a resource
+ *
+ * @param hint
+ * The value to add into this Id.
+ */
+ void setProviderHint(Object hint);
+
+ /**
+ * Return the previously stored Provider hint object.
+ *
+ * @return the previously stored Provider hint object.
+ */
+ Object getProviderHint();
+
+ /**
+ * Allows a provider to set it's own internal Id object for this resource
+ * in the case where the JMS framework Id cannot be used directly by the
+ * Provider implementation.
+ *
+ * @param id
+ */
+ void setProviderId(Object id);
+
+ /**
+ * Returns the previously stored Provider Id value.
+ *
+ * @return the previously stored Provider Id value.
+ */
+ Object getProviderId();
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java
new file mode 100644
index 0000000..1e963d9
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+import org.apache.qpid.jms.JmsDestination;
+
+/**
+ * Visitor interface to make processing JmsResources simpler.
+ */
+public interface JmsResourceVistor {
+
+ void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception;
+
+ void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception;
+
+ void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception;
+
+ void processProducerInfo(JmsProducerInfo producerInfo) throws Exception;
+
+ void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception;
+
+ void processDestination(JmsDestination destination) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java
new file mode 100644
index 0000000..de0d452
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+public final class JmsSessionId extends JmsAbstractResourceId implements Comparable<JmsSessionId> {
+
+ private final String connectionId;
+ private final long value;
+
+ protected transient String key;
+ protected transient JmsConnectionId parentId;
+
+ public JmsSessionId(String connectionId, long value) {
+ this.connectionId = connectionId;
+ this.value = value;
+ }
+
+ public JmsSessionId(JmsConnectionId connectionId, long sessionId) {
+ this.connectionId = connectionId.getValue();
+ this.value = sessionId;
+ this.parentId = connectionId;
+ }
+
+ public JmsSessionId(JmsSessionId id) {
+ this.connectionId = id.getConnectionId();
+ this.value = id.getValue();
+ }
+
+ public JmsSessionId(JmsProducerId id) {
+ this.connectionId = id.getConnectionId();
+ this.value = id.getSessionId();
+ }
+
+ public JmsSessionId(JmsConsumerId id) {
+ this.connectionId = id.getConnectionId();
+ this.value = id.getSessionId();
+ }
+
+ public JmsConnectionId getParentId() {
+ if (parentId == null) {
+ parentId = new JmsConnectionId(this);
+ }
+ return parentId;
+ }
+
+ @Override
+ public int hashCode() {
+ if (hashCode == 0) {
+ hashCode = connectionId.hashCode() ^ (int)value;
+ }
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != JmsSessionId.class) {
+ return false;
+ }
+ JmsSessionId id = (JmsSessionId)o;
+ return value == id.value && connectionId.equals(id.connectionId);
+ }
+
+ public String getConnectionId() {
+ return connectionId;
+ }
+
+ public long getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ if (key == null) {
+ key = connectionId + ":" + value;
+ }
+ return key;
+ }
+
+ @Override
+ public int compareTo(JmsSessionId other) {
+ return toString().compareTo(other.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
new file mode 100644
index 0000000..04f57d8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+import javax.jms.Session;
+
+public final class JmsSessionInfo implements JmsResource {
+
+ private final JmsSessionId sessionId;
+ private int acknowledgementMode;
+ private boolean sendAcksAsync;
+
+ public JmsSessionInfo(JmsConnectionInfo connectionMeta, long sessionId) {
+ this.sessionId = new JmsSessionId(connectionMeta.getConnectionId(), sessionId);
+ }
+
+ public JmsSessionInfo(JmsSessionId sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public JmsSessionId getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public void visit(JmsResourceVistor vistor) throws Exception {
+ vistor.processSessionInfo(this);
+ }
+
+ public int getAcknowledgementMode() {
+ return acknowledgementMode;
+ }
+
+ public void setAcknowledgementMode(int acknowledgementMode) {
+ this.acknowledgementMode = acknowledgementMode;
+ }
+
+ public boolean isTransacted() {
+ return this.acknowledgementMode == Session.SESSION_TRANSACTED;
+ }
+
+ public boolean isSendAcksAsync() {
+ return sendAcksAsync;
+ }
+
+ public void setSendAcksAsync(boolean sendAcksAsync) {
+ this.sendAcksAsync = sendAcksAsync;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java
new file mode 100644
index 0000000..226aedf
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+public final class JmsTransactionId extends JmsAbstractResourceId implements Comparable<JmsTransactionId> {
+
+ private final JmsConnectionId connectionId;
+ private final long value;
+
+ private transient String transactionKey;
+
+ public JmsTransactionId(JmsConnectionId connectionId, long transactionId) {
+ this.connectionId = connectionId;
+ this.value = transactionId;
+ }
+
+ public String getTransactionKey() {
+ if (transactionKey == null) {
+ transactionKey = "TX:" + connectionId + ":" + value;
+ }
+ return transactionKey;
+ }
+
+ @Override
+ public String toString() {
+ return getTransactionKey();
+ }
+
+ @Override
+ public int hashCode() {
+ if (hashCode == 0) {
+ hashCode = connectionId.hashCode() ^ (int)value;
+ }
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || other.getClass() != JmsTransactionId.class) {
+ return false;
+ }
+
+ JmsTransactionId tx = (JmsTransactionId) other;
+
+ return value == tx.value && connectionId.equals(tx.connectionId);
+ }
+
+ @Override
+ public int compareTo(JmsTransactionId o) {
+ int result = connectionId.compareTo(o.connectionId);
+ if (result == 0) {
+ result = (int)(value - o.value);
+ }
+ return result;
+ }
+
+ public long getValue() {
+ return value;
+ }
+
+ public JmsConnectionId getConnectionId() {
+ return connectionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
new file mode 100644
index 0000000..5fca50f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+public final class JmsTransactionInfo implements JmsResource, Comparable<JmsTransactionInfo> {
+
+ protected final JmsSessionId sessionId;
+ protected JmsTransactionId transactionId;
+
+ public JmsTransactionInfo(JmsSessionId sessionId, JmsTransactionId transactionId) {
+ this.sessionId = sessionId;
+ this.transactionId = transactionId;
+ }
+
+ public JmsTransactionInfo copy() {
+ return new JmsTransactionInfo(sessionId, transactionId);
+ }
+
+ public JmsSessionId getSessionId() {
+ return sessionId;
+ }
+
+ public JmsTransactionId getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(JmsTransactionId transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public JmsSessionId getParentId() {
+ return this.sessionId;
+ }
+
+ @Override
+ public int hashCode() {
+ return (transactionId == null) ? 0 : transactionId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ JmsTransactionInfo other = (JmsTransactionInfo) obj;
+
+ if (transactionId == null && other.transactionId != null) {
+ return false;
+ } else if (!transactionId.equals(other.transactionId)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int compareTo(JmsTransactionInfo other) {
+ return this.transactionId.compareTo(other.transactionId);
+ }
+
+ @Override
+ public String toString() {
+ return "JmsTransactionInfo { " + this.transactionId + " }";
+ }
+
+ @Override
+ public void visit(JmsResourceVistor visitor) throws Exception {
+ visitor.processTransactionInfo(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html b/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html
new file mode 100644
index 0000000..3d678ce
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+The core AMQP JMS client implementation classes.
+
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
new file mode 100644
index 0000000..9d743d2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * Base class used to implement the most common features of a Provider instance..
+ *
+ * Methods that are fully optional such as transaction commit and rollback are implemented
+ * here to throw an UnsupportedOperationException.
+ */
+public abstract class AbstractProvider implements Provider {
+
+ protected final URI remoteURI;
+ protected final AtomicBoolean closed = new AtomicBoolean();
+ protected final ScheduledExecutorService serializer;
+
+ protected ProviderListener listener;
+
+ public AbstractProvider(URI remoteURI) {
+ this.remoteURI = remoteURI;
+
+ this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable runner) {
+ Thread serial = new Thread(runner);
+ serial.setDaemon(true);
+ serial.setName(toString());
+ return serial;
+ }
+ });
+ }
+
+ @Override
+ public void start() throws IOException, IllegalStateException {
+ checkClosed();
+
+ if (listener == null) {
+ throw new IllegalStateException("No ProviderListener registered.");
+ }
+ }
+
+ @Override
+ public void setProviderListener(ProviderListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public ProviderListener getProviderListener() {
+ return listener;
+ }
+
+ @Override
+ public URI getRemoteURI() {
+ return remoteURI;
+ }
+
+ public void fireProviderException(Throwable ex) {
+ ProviderListener listener = this.listener;
+ if (listener != null) {
+ listener.onConnectionFailure(IOExceptionSupport.create(ex));
+ }
+ }
+
+ protected void checkClosed() throws ProviderClosedException {
+ if (closed.get()) {
+ throw new ProviderClosedException("The Provider is already closed");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java
new file mode 100644
index 0000000..b79a3cc
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+/**
+ * Defines a result interface for Asynchronous operations.
+ */
+public interface AsyncResult {
+
+ /**
+ * If the operation fails this method is invoked with the Exception
+ * that caused the failure.
+ *
+ * @param result
+ * The error that resulted in this asynchronous operation failing.
+ */
+ void onFailure(Throwable result);
+
+ /**
+ * If the operation succeeds the resulting value produced is set to null and
+ * the waiting parties are signaled.
+ */
+ void onSuccess();
+
+ /**
+ * Returns true if the AsyncResult has completed. The task is considered complete
+ * regardless if it succeeded or failed.
+ *
+ * @return returns true if the asynchronous operation has completed.
+ */
+ boolean isComplete();
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
new file mode 100644
index 0000000..e0d3e01
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Default implementation that does nothing for all callbacks.
+ */
+public class DefaultProviderListener implements ProviderListener {
+
+ @Override
+ public void onMessage(JmsInboundMessageDispatch envelope) {
+ }
+
+ @Override
+ public void onConnectionInterrupted(URI remoteURI) {
+ }
+
+ @Override
+ public void onConnectionFailure(IOException ex) {
+ }
+
+ @Override
+ public void onConnectionRecovery(Provider provider) {
+ }
+
+ @Override
+ public void onConnectionRecovered(Provider provider) {
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
new file mode 100644
index 0000000..b8634ea
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+/**
+ * Defines the interface that an Implementation of a Specific wire level protocol
+ * provider must implement. This Provider interface requires that the implementation
+ * methods all operate in an asynchronous manner.
+ */
+public interface Provider {
+
+ /**
+ * Performs the initial low level connection for this provider such as TCP or
+ * SSL connection to a remote Broker. If this operation fails then the Provider
+ * is considered to be unusable and no further operations should be attempted
+ * using this Provider.
+ *
+ * @throws IOException if the remote resource can not be contacted.
+ */
+ void connect() throws IOException;
+
+ /**
+ * Starts the Provider. The start method provides a place for the Provider to perform
+ * and pre-start configuration checks to ensure that the current state is valid and that
+ * all contracts have been met prior to starting.
+ *
+ * @throws IOException if an error occurs during start processing.
+ * @throws IllegalStateException if the Provider is improperly configured.
+ */
+ void start() throws IOException, IllegalStateException;
+
+ /**
+ * Closes this Provider terminating all connections and canceling any pending
+ * operations. The Provider is considered unusable after this call. This call
+ * is a blocking call and will not return until the Provider has closed or an
+ * error occurs.
+ */
+ void close();
+
+ /**
+ * Returns the URI used to configure this Provider and specify the remote address of the
+ * Broker it connects to.
+ *
+ * @return the URI used to configure this Provider.
+ */
+ URI getRemoteURI();
+
+ /**
+ * Create the Provider version of the given JmsResource.
+ *
+ * For each JMS Resource type the Provider implementation must create it's own internal
+ * representation and upon successful creation provide the caller with a response. The
+ * Provider should examine the given JmsResource to determine if the given configuration
+ * is supported or can be simulated, or is not supported in which case an exception should be
+ * thrown.
+ *
+ * It is possible for a Provider to indicate that it cannot complete a requested create
+ * either due to some mis-configuration such as bad login credentials on connection create
+ * by throwing a JMSException. If the Provider does not support creating of the indicated
+ * resource such as a Temporary Queue etc the provider may throw an UnsupportedOperationException
+ * to indicate this.
+ *
+ * @param resource
+ * The JmsResouce instance that indicates what is being created.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ * @throws JMSException if an error occurs due to JMS violation such as bad credentials.
+ */
+ void create(JmsResource resource, AsyncResult request) throws IOException, JMSException;
+
+ /**
+ * Starts the Provider version of the given JmsResource.
+ *
+ * For some JMS Resources it is necessary or advantageous to have a started state that
+ * must be triggered prior to it's normal use.
+ *
+ * An example of this would be a MessageConsumer which should not receive any incoming
+ * messages until the JMS layer is in a state to handle them. One such time would be
+ * after connection recovery. A JMS consumer should normally recover with it's prefetch
+ * value set to zero, or an AMQP link credit of zero and only open up the credit window
+ * once all Connection resources are restored.
+ *
+ * The provider is required to implement this method and not throw any error other than
+ * an IOException if a communication error occurs. The start operation is not required to
+ * have any effect on the provider resource but must not throw UnsupportedOperation etc.
+ *
+ * @param resource
+ * The JmsResouce instance that indicates what is being started.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ * @throws JMSException if an error occurs due to JMS violation such as already closed resource.
+ */
+ void start(JmsResource resource, AsyncResult request) throws IOException, JMSException;
+
+ /**
+ * Instruct the Provider to dispose of a given JmsResource.
+ *
+ * The provider is given a JmsResource which it should use to remove any associated
+ * resources and inform the remote Broker instance of the removal of this resource.
+ *
+ * If the Provider cannot destroy the resource due to a non-communication error such as
+ * the logged in user not have role access to destroy the given resource it may throw an
+ * instance of JMSException to indicate such an error.
+ *
+ * @param resource
+ * The JmsResouce that identifies a previously created JmsResource.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ * @throws JMSException if an error occurs due to JMS violation such as not authorized.
+ */
+ void destroy(JmsResource resourceId, AsyncResult request) throws IOException, JMSException;
+
+ /**
+ * Sends the JmsMessage contained in the outbound dispatch envelope.
+ *
+ * @param envelope
+ * the message envelope containing the JmsMessage to send.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ * @throws JMSException if an error that maps to JMS occurs such as not authorized.
+ */
+ void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException;
+
+ /**
+ * Called to acknowledge all messages that have been delivered in a given session.
+ *
+ * This method is typically used by a Session that is configured for client acknowledge
+ * mode. The acknowledgment should only be applied to Messages that have been marked
+ * as delivered and not those still awaiting dispatch.
+ *
+ * @param sessionId
+ * the ID of the Session whose delivered messages should be acknowledged.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ * @throws JMSException if an error occurs due to JMS violation such as unmatched ack.
+ */
+ void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException;
+
+ /**
+ * Called to acknowledge a JmsMessage has been delivered, consumed, re-delivered...etc.
+ *
+ * The provider should perform an acknowledgment for the message based on the configured
+ * mode of the consumer that it was dispatched to and the capabilities of the protocol.
+ *
+ * @param envelope
+ * The message dispatch envelope containing the Message delivery information.
+ * @param ackType
+ * The type of acknowledgment being done.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ * @throws JMSException if an error occurs due to JMS violation such as unmatched ack.
+ */
+ void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType, AsyncResult request)
+ throws IOException, JMSException;
+
+ /**
+ * Called to commit an open transaction.
+ *
+ * If the provider is unable to support transactions then it should throw an
+ * UnsupportedOperationException to indicate this. The Provider may also throw a
+ * JMSException to indicate a transaction was already rolled back etc.
+ *
+ * @param sessionId
+ * the Id of the JmsSession that is committing the current transaction.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ * @throws JMSException if an error occurs due to JMS violation such not authorized.
+ */
+ void commit(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException;
+
+ /**
+ * Called to roll back an open transaction.
+ *
+ * @param sessionId
+ * the Id of the JmsSession that is rolling back the current transaction.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ * @throws JMSException if an error occurs due to JMS violation such not authorized.
+ */
+ void rollback(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException;
+
+ /**
+ * Called to recover all unacknowledged messages for a Session in client Ack mode.
+ *
+ * @param sessionId
+ * the Id of the JmsSession that is recovering unacknowledged messages..
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ */
+ void recover(JmsSessionId sessionId, AsyncResult request) throws IOException;
+
+ /**
+ * Remove a durable topic subscription by name.
+ *
+ * A provider can throw an instance of JMSException to indicate that it cannot perform the
+ * un-subscribe operation due to bad security credentials etc.
+ *
+ * @param subscription
+ * the name of the durable subscription that is to be removed.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ * @throws JMSException if an error occurs due to JMS violation such not authorized.
+ */
+ void unsubscribe(String subscription, AsyncResult request) throws IOException, JMSException;
+
+ /**
+ * Request a remote peer send a Message to this client. A message pull request is
+ * usually only needed in the case where the client sets a zero prefetch limit on the
+ * consumer. If the consumer has a set prefetch that's greater than zero this method
+ * should just return without performing and action.
+ *
+ * @param timeout
+ * the amount of time to tell the remote peer to keep this pull request valid.
+ * @param request
+ * The request object that should be signaled when this operation completes.
+ *
+ * @throws IOException if an error occurs or the Provider is already closed.
+ */
+ void pull(JmsConsumerId consumerId, long timeout, AsyncResult request) throws IOException;
+
+ /**
+ * Gets the Provider specific Message factory for use in the JMS layer when a Session
+ * is asked to create a Message type. The Provider should implement it's own internal
+ * JmsMessage core to optimize read / write and marshal operations for the connection.
+ *
+ * @returns a JmsMessageFactory instance for use by the JMS layer.
+ */
+ JmsMessageFactory getMessageFactory();
+
+ /**
+ * Sets the listener of events from this Provider instance.
+ *
+ * @param listener
+ * The listener instance that will receive all event callbacks.
+ */
+ void setProviderListener(ProviderListener listener);
+
+ /**
+ * Gets the currently set ProdiverListener instance.
+ *
+ * @return the currently set ProviderListener instance.
+ */
+ ProviderListener getProviderListener();
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
new file mode 100644
index 0000000..1e00b58
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+
+public class ProviderClosedException extends IOException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ProviderClosedException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
new file mode 100644
index 0000000..4e3f90c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+/**
+ * Set of Provider specific constants used when interacting with the Provider API.
+ */
+public final class ProviderConstants {
+
+ private ProviderConstants() {}
+
+ public enum ACK_TYPE {
+ DELIVERED(0),
+ CONSUMED(1),
+ REDELIVERED(2),
+ POISONED(3),
+ EXPIRED(4);
+
+ private final int value;
+
+ private ACK_TYPE(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
new file mode 100644
index 0000000..c07518e
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.util.FactoryFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Interface that all JMS Providers must implement.
+ */
+public abstract class ProviderFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProviderFactory.class);
+
+ private static final FactoryFinder<ProviderFactory> PROVIDER_FACTORY_FINDER =
+ new FactoryFinder<ProviderFactory>(ProviderFactory.class,
+ "META-INF/services/" + ProviderFactory.class.getPackage().getName().replace(".", "/") + "/");
+
+ /**
+ * Creates an instance of the given AsyncProvider and configures it using the
+ * properties set on the given remote broker URI.
+ *
+ * @param remoteURI
+ * The URI used to connect to a remote Broker.
+ *
+ * @return a new AsyncProvider instance.
+ *
+ * @throws Exception if an error occurs while creating the Provider instance.
+ */
+ public abstract Provider createAsyncProvider(URI remoteURI) throws Exception;
+
+ /**
+ * @return the name of this JMS Provider, e.g. STOMP, AMQP, MQTT...etc
+ */
+ public abstract String getName();
+
+ /**
+ * Static create method that performs the ProviderFactory search and handles the
+ * configuration and setup.
+ *
+ * @param remoteURI
+ * the URI of the remote peer.
+ *
+ * @return a new AsyncProvider instance that is ready for use.
+ *
+ * @throws Exception if an error occurs while creating the AsyncProvider instance.
+ */
+ public static Provider createAsync(URI remoteURI) throws Exception {
+ Provider result = null;
+
+ try {
+ ProviderFactory factory = findProviderFactory(remoteURI);
+ result = factory.createAsyncProvider(remoteURI);
+ result.connect();
+ } catch (Exception ex) {
+ LOG.error("Failed to create BlockingProvider instance for: {}", remoteURI.getScheme());
+ LOG.trace("Error: ", ex);
+ throw ex;
+ }
+
+ return result;
+ }
+
+ /**
+ * Searches for a ProviderFactory by using the scheme from the given URI.
+ *
+ * The search first checks the local cache of provider factories before moving on
+ * to search in the class path.
+ *
+ * @param location
+ * The URI whose scheme will be used to locate a ProviderFactory.
+ *
+ * @return a provider factory instance matching the URI's scheme.
+ *
+ * @throws IOException if an error occurs while locating the factory.
+ */
+ public static ProviderFactory findProviderFactory(URI location) throws IOException {
+ String scheme = location.getScheme();
+ if (scheme == null) {
+ throw new IOException("No Provider scheme specified: [" + location + "]");
+ }
+
+ ProviderFactory factory = null;
+ try {
+ factory = PROVIDER_FACTORY_FINDER.newInstance(scheme);
+ } catch (Throwable e) {
+ throw new IOException("Provider scheme NOT recognized: [" + scheme + "]", e);
+ }
+
+ return factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
new file mode 100644
index 0000000..7a52ad3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * Asynchronous Provider Future class.
+ */
+public class ProviderFuture implements AsyncResult {
+
+ protected final CountDownLatch latch = new CountDownLatch(1);
+ protected Throwable error;
+ protected final AsyncResult watcher;
+
+ public ProviderFuture() {
+ this.watcher = null;
+ }
+
+ public ProviderFuture(AsyncResult watcher) {
+ this.watcher = watcher;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return latch.getCount() == 0;
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ error = result;
+ latch.countDown();
+ if (watcher != null) {
+ watcher.onFailure(error);
+ }
+ }
+
+ @Override
+ public void onSuccess() {
+ latch.countDown();
+ if (watcher != null) {
+ watcher.onSuccess();
+ }
+ }
+
+ /**
+ * Timed wait for a response to a Provider operation.
+ *
+ * @param amount
+ * The amount of time to wait before abandoning the wait.
+ * @param unit
+ * The unit to use for this wait period.
+ *
+ * @return the result of this operation or null if the wait timed out.
+ *
+ * @throws IOException if an error occurs while waiting for the response.
+ */
+ public void sync(long amount, TimeUnit unit) throws IOException {
+ try {
+ latch.await(amount, unit);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+ failOnError();
+ }
+
+ /**
+ * Waits for a response to some Provider requested operation.
+ *
+ * @return the response from the Provider for this operation.
+ *
+ * @throws IOException if an error occurs while waiting for the response.
+ */
+ public void sync() throws IOException {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+ failOnError();
+ }
+
+ private void failOnError() throws IOException {
+ Throwable cause = error;
+ if (cause != null) {
+ throw IOExceptionSupport.create(cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
new file mode 100644
index 0000000..18938d5
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Events interface used to update the listener with changes in provider state.
+ */
+public interface ProviderListener {
+
+ /**
+ * Called when a new Message has arrived for a registered consumer.
+ *
+ * @param envelope
+ * The dispatch object containing the message and delivery information.
+ */
+ void onMessage(JmsInboundMessageDispatch envelope);
+
+ /**
+ * Called from a fault tolerant Provider instance to signal that the underlying
+ * connection to the Broker has been lost. The Provider will attempt to reconnect
+ * following this event unless closed.
+ *
+ * It is considered a programming error to allow any exceptions to be thrown from
+ * this notification method.
+ *
+ * @param remoteURI
+ * The URI of the Broker whose connection was lost.
+ */
+ void onConnectionInterrupted(URI remoteURI);
+
+ /**
+ * Called to indicate that a connection to the Broker has been reestablished and
+ * that notified listener should start to recover it's state. The provider will
+ * not transition to the recovered state until the listener notifies the provider
+ * that recovery is complete.
+ *
+ * @param provider
+ * The new Provider instance that will become active after the state
+ * has been recovered.
+ *
+ * @throws Exception if an error occurs during recovery attempt, this will fail
+ * the Provider that's being used for recovery.
+ */
+ void onConnectionRecovery(Provider provider) throws Exception;
+
+ /**
+ * Called to indicate that a connection to the Broker has been reestablished and
+ * that all recovery operations have succeeded and the connection will now be
+ * transitioned to a recovered state. This method gives the listener a chance
+ * so send any necessary post recovery commands such as consumer start or message
+ * pull for a zero prefetch consumer etc.
+ *
+ * @param provider
+ * The new Provider instance that will become active after the state
+ * has been recovered.
+ *
+ * @throws Exception if an error occurs during recovery attempt, this will fail
+ * the Provider that's being used for recovery.
+ */
+ void onConnectionRecovered(Provider provider) throws Exception;
+
+ /**
+ * Called to signal that all recovery operations are now complete and the Provider
+ * is again in a normal connected state.
+ *
+ * It is considered a programming error to allow any exceptions to be thrown from
+ * this notification method.
+ *
+ * @param remoteURI
+ * The URI of the Broker that the client has now connected to.
+ */
+ void onConnectionRestored(URI remoteURI);
+
+ /**
+ * Called to indicate that the underlying connection to the Broker has been lost and
+ * the Provider will not perform any reconnect. Following this call the provider is
+ * in a failed state and further calls to it will throw an Exception.
+ *
+ * @param ex
+ * The exception that indicates the cause of this Provider failure.
+ */
+ void onConnectionFailure(IOException ex);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
new file mode 100644
index 0000000..1381dba
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+/**
+ * Allows one AsyncProvider instance to wrap around another and provide some additional
+ * features beyond the normal AsyncProvider interface.
+ *
+ * This wrapper is meant primarily for Providers that are adding some additional feature
+ * on-top of an existing provider such as a discovery based provider that only needs to
+ * pass along discovered remote peer information.
+ */
+public class ProviderWrapper<E extends Provider> implements Provider, ProviderListener {
+
+ protected final E next;
+ protected ProviderListener listener;
+
+ public ProviderWrapper(E next) {
+ this.next = next;
+ this.next.setProviderListener(this);
+ }
+
+ @Override
+ public void connect() throws IOException {
+ next.connect();
+ }
+
+ @Override
+ public void start() throws IOException, IllegalStateException {
+ if (this.listener == null) {
+ throw new IllegalStateException("Cannot start with null ProviderListener");
+ }
+ next.start();
+ }
+
+ @Override
+ public void close() {
+ next.close();
+ }
+
+ @Override
+ public URI getRemoteURI() {
+ return next.getRemoteURI();
+ }
+
+ @Override
+ public void create(JmsResource resource, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+ next.create(resource, request);
+ }
+
+ @Override
+ public void start(JmsResource resource, AsyncResult request) throws IOException, JMSException {
+ next.start(resource, request);
+ }
+
+ @Override
+ public void destroy(JmsResource resourceId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+ next.destroy(resourceId, request);
+ }
+
+ @Override
+ public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+ next.send(envelope, request);
+ }
+
+ @Override
+ public void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException {
+ next.acknowledge(sessionId, request);
+ }
+
+ @Override
+ public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException {
+ next.acknowledge(envelope, ackType, request);
+ }
+
+ @Override
+ public void commit(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+ next.commit(sessionId, request);
+ }
+
+ @Override
+ public void rollback(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+ next.rollback(sessionId, request);
+ }
+
+ @Override
+ public void recover(JmsSessionId sessionId, AsyncResult request) throws IOException, UnsupportedOperationException {
+ next.recover(sessionId, request);
+ }
+
+ @Override
+ public void unsubscribe(String subscription, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+ next.unsubscribe(subscription, request);
+ }
+
+ @Override
+ public void pull(JmsConsumerId consumerId, long timeout, AsyncResult request) throws IOException, UnsupportedOperationException {
+ next.pull(consumerId, timeout, request);
+ }
+
+ @Override
+ public JmsMessageFactory getMessageFactory() {
+ return next.getMessageFactory();
+ }
+
+ @Override
+ public void setProviderListener(ProviderListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public ProviderListener getProviderListener() {
+ return this.listener;
+ }
+
+ @Override
+ public void onMessage(JmsInboundMessageDispatch envelope) {
+ this.listener.onMessage(envelope);
+ }
+
+ @Override
+ public void onConnectionInterrupted(URI remoteURI) {
+ this.listener.onConnectionInterrupted(remoteURI);
+ }
+
+ @Override
+ public void onConnectionRecovery(Provider provider) throws Exception {
+ this.listener.onConnectionRecovery(provider);
+ }
+
+ @Override
+ public void onConnectionRecovered(Provider provider) throws Exception {
+ this.listener.onConnectionRecovered(provider);
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ this.listener.onConnectionRestored(remoteURI);
+ }
+
+ @Override
+ public void onConnectionFailure(IOException ex) {
+ this.listener.onConnectionInterrupted(this.next.getRemoteURI());
+ }
+
+ /**
+ * @return the wrapped AsyncProvider.
+ */
+ public Provider getNext() {
+ return this.next;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
new file mode 100644
index 0000000..1175b34
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base for all AmqpResource implementations to extend.
+ *
+ * This abstract class wraps up the basic state management bits so that the concrete
+ * object don't have to reproduce it. Provides hooks for the subclasses to initialize
+ * and shutdown.
+ */
+public abstract class AbstractAmqpResource<R extends JmsResource, E extends Endpoint> implements AmqpResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractAmqpResource.class);
+
+ protected AsyncResult openRequest;
+ protected AsyncResult closeRequest;
+
+ protected E endpoint;
+ protected R info;
+
+ /**
+ * Creates a new AbstractAmqpResource instance with the JmsResource provided, and
+ * sets the Endpoint to null.
+ *
+ * @param info
+ * The JmsResource instance that this AmqpResource is managing.
+ */
+ public AbstractAmqpResource(R info) {
+ this(info, null);
+ }
+
+ /**
+ * Creates a new AbstractAmqpResource instance with the JmsResource provided, and
+ * sets the Endpoint to the given value.
+ *
+ * @param info
+ * The JmsResource instance that this AmqpResource is managing.
+ * @param endpoint
+ * The Proton Endpoint instance that this object maps to.
+ */
+ public AbstractAmqpResource(R info, E endpoint) {
+ this.info = info;
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public void open(AsyncResult request) {
+ this.openRequest = request;
+ doOpen();
+ this.endpoint.setContext(this);
+ this.endpoint.open();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return this.endpoint.getRemoteState() == EndpointState.ACTIVE;
+ }
+
+ @Override
+ public boolean isAwaitingOpen() {
+ return this.openRequest != null;
+ }
+
+ @Override
+ public void opened() {
+ if (this.openRequest != null) {
+ this.openRequest.onSuccess();
+ this.openRequest = null;
+ }
+ }
+
+ @Override
+ public void close(AsyncResult request) {
+ // If already closed signal success or else the caller might never get notified.
+ if (endpoint.getLocalState() == EndpointState.CLOSED) {
+ request.onSuccess();
+ return;
+ }
+
+ this.closeRequest = request;
+ doClose();
+ this.endpoint.close();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.endpoint.getLocalState() == EndpointState.CLOSED;
+ }
+
+ @Override
+ public boolean isAwaitingClose() {
+ return this.closeRequest != null;
+ }
+
+ @Override
+ public void closed() {
+ if (this.closeRequest != null) {
+ this.closeRequest.onSuccess();
+ this.closeRequest = null;
+ }
+
+ this.endpoint.close();
+ this.endpoint.free();
+ }
+
+ @Override
+ public void failed() {
+ failed(new JMSException("Remote request failed."));
+ }
+
+ @Override
+ public void failed(Exception cause) {
+ if (openRequest != null) {
+ openRequest.onFailure(cause);
+ openRequest = null;
+ }
+
+ if (closeRequest != null) {
+ closeRequest.onFailure(cause);
+ closeRequest = null;
+ }
+ }
+
+ public E getEndpoint() {
+ return this.endpoint;
+ }
+
+ public R getJmsResource() {
+ return this.info;
+ }
+
+ public EndpointState getLocalState() {
+ if (endpoint == null) {
+ return EndpointState.UNINITIALIZED;
+ }
+ return this.endpoint.getLocalState();
+ }
+
+ public EndpointState getRemoteState() {
+ if (endpoint == null) {
+ return EndpointState.UNINITIALIZED;
+ }
+ return this.endpoint.getRemoteState();
+ }
+
+ @Override
+ public Exception getRemoteError() {
+ String message = getRemoteErrorMessage();
+ Exception remoteError = null;
+ Symbol error = endpoint.getRemoteCondition().getCondition();
+ if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+ remoteError = new JMSSecurityException(message);
+ } else {
+ remoteError = new JMSException(message);
+ }
+
+ return remoteError;
+ }
+
+ @Override
+ public String getRemoteErrorMessage() {
+ String message = "Received unkown error from remote peer";
+ if (endpoint.getRemoteCondition() != null) {
+ ErrorCondition error = endpoint.getRemoteCondition();
+ if (error.getDescription() != null && !error.getDescription().isEmpty()) {
+ message = error.getDescription();
+ }
+ }
+
+ return message;
+ }
+
+ @Override
+ public void processStateChange() throws IOException {
+ EndpointState remoteState = endpoint.getRemoteState();
+
+ if (remoteState == EndpointState.ACTIVE) {
+ if (isAwaitingOpen()) {
+ LOG.debug("{} is now open: ", this);
+ opened();
+ }
+
+ // Should not receive an ACTIVE event if not awaiting the open state.
+ } else if (remoteState == EndpointState.CLOSED) {
+ if (isAwaitingClose()) {
+ LOG.debug("{} is now closed: ", this);
+ closed();
+ } else if (isAwaitingOpen()) {
+ // Error on Open, create exception and signal failure.
+ LOG.warn("Open of {} failed: ", this);
+ Exception remoteError = this.getRemoteError();
+ failed(remoteError);
+ } else {
+ // TODO - Handle remote asynchronous close.
+ LOG.warn("{} was closed remotely.", this);
+ }
+ }
+ }
+
+ @Override
+ public void processDeliveryUpdates() throws IOException {
+ }
+
+ @Override
+ public void processFlowUpdates() throws IOException {
+ }
+
+ protected abstract void doOpen();
+
+ protected abstract void doClose();
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
new file mode 100644
index 0000000..7da3143
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.util.IdGenerator;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the case of anonymous JMS MessageProducers.
+ *
+ * In order to simulate the anonymous producer we must create a sender for each message
+ * send attempt and close it following a successful send.
+ */
+public class AmqpAnonymousProducer extends AmqpProducer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousProducer.class);
+ private static final IdGenerator producerIdGenerator = new IdGenerator();
+
+ private final String producerIdKey = producerIdGenerator.generateId();
+ private long producerIdCount;
+
+ /**
+ * Creates the Anonymous Producer object.
+ *
+ * @param session
+ * the session that owns this producer
+ * @param info
+ * the JmsProducerInfo for this producer.
+ */
+ public AmqpAnonymousProducer(AmqpSession session, JmsProducerInfo info) {
+ super(session, info);
+ }
+
+ @Override
+ public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+
+ LOG.trace("Started send chain for anonymous producer: {}", getProducerId());
+
+ // Create a new ProducerInfo for the short lived producer that's created to perform the
+ // send to the given AMQP target.
+ JmsProducerInfo info = new JmsProducerInfo(getNextProducerId());
+ info.setDestination(envelope.getDestination());
+
+ // We open a Fixed Producer instance with the target destination. Once it opens
+ // it will trigger the open event which will in turn trigger the send event and
+ // when that succeeds it will trigger a close which completes the send chain.
+ AmqpFixedProducer producer = new AmqpFixedProducer(session, info);
+ producer.setPresettle(isPresettle());
+ AnonymousOpenRequest open = new AnonymousOpenRequest(request, producer, envelope);
+ producer.open(open);
+
+ return true;
+ }
+
+ @Override
+ public void open(AsyncResult request) {
+ // Trigger an immediate open, we don't talk to the Broker until
+ // a send occurs so we must not let the client block.
+ request.onSuccess();
+ }
+
+ @Override
+ public void close(AsyncResult request) {
+ // Trigger an immediate close, the internal producers that are currently in a send
+ // will track their own state and close as the send completes or fails.
+ request.onSuccess();
+ }
+
+ @Override
+ protected void doOpen() {
+ }
+
+ @Override
+ protected void doClose() {
+ }
+
+ @Override
+ public boolean isAnonymous() {
+ return true;
+ }
+
+ @Override
+ public EndpointState getLocalState() {
+ return EndpointState.ACTIVE;
+ }
+
+ @Override
+ public EndpointState getRemoteState() {
+ return EndpointState.ACTIVE;
+ }
+
+ private JmsProducerId getNextProducerId() {
+ return new JmsProducerId(producerIdKey, -1, producerIdCount++);
+ }
+
+ private abstract class AnonymousRequest implements AsyncResult {
+
+ protected final AsyncResult sendResult;
+ protected final AmqpProducer producer;
+ protected final JmsOutboundMessageDispatch envelope;
+
+ public AnonymousRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) {
+ this.sendResult = sendResult;
+ this.producer = producer;
+ this.envelope = envelope;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return sendResult.isComplete();
+ }
+
+ /**
+ * In all cases of the chain of events that make up the send for an anonymous
+ * producer a failure will trigger the original send request to fail.
+ */
+ @Override
+ public void onFailure(Throwable result) {
+ LOG.debug("Send failed during {} step in chain: {}", this.getClass().getName(), getProducerId());
+ sendResult.onFailure(result);
+ }
+ }
+
+ private final class AnonymousOpenRequest extends AnonymousRequest {
+
+ public AnonymousOpenRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) {
+ super(sendResult, producer, envelope);
+ }
+
+ @Override
+ public void onSuccess() {
+ LOG.trace("Open phase of anonymous send complete: {} ", getProducerId());
+ AnonymousSendRequest send = new AnonymousSendRequest(this);
+ try {
+ producer.send(envelope, send);
+ } catch (Exception e) {
+ sendResult.onFailure(e);
+ }
+ }
+ }
+
+ private final class AnonymousSendRequest extends AnonymousRequest {
+
+ public AnonymousSendRequest(AnonymousOpenRequest open) {
+ super(open.sendResult, open.producer, open.envelope);
+ }
+
+ @Override
+ public void onSuccess() {
+ LOG.trace("Send phase of anonymous send complete: {} ", getProducerId());
+ AnonymousCloseRequest close = new AnonymousCloseRequest(this);
+ producer.close(close);
+ }
+ }
+
+ private final class AnonymousCloseRequest extends AnonymousRequest {
+
+ public AnonymousCloseRequest(AnonymousSendRequest send) {
+ super(send.sendResult, send.producer, send.envelope);
+ }
+
+ @Override
+ public void onSuccess() {
+ LOG.trace("Close phase of anonymous send complete: {} ", getProducerId());
+ sendResult.onSuccess();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org