You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:42 UTC
[18/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
new file mode 100644
index 0000000..b1d0782
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the AMQP provider.
+ */
+public class AmqpProviderFactory extends ProviderFactory {
+
+ @Override
+ public Provider createAsyncProvider(URI remoteURI) throws Exception {
+
+ Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+ Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "provider.");
+
+ remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+ Provider result = new AmqpProvider(remoteURI);
+
+ if (!PropertyUtil.setProperties(result, providerOptions)) {
+ String msg = ""
+ + " Not all provider options could be set on the AMQP Provider."
+ + " Check the options are spelled correctly."
+ + " Given parameters=[" + providerOptions + "]."
+ + " This provider instance cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+
+ return result;
+ }
+
+ @Override
+ public String getName() {
+ return "AMQP";
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
new file mode 100644
index 0000000..e23b4c0
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Queue Browser implementation for AMQP
+ */
+public class AmqpQueueBrowser extends AmqpConsumer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpQueueBrowser.class);
+
+ /**
+ * @param session
+ * @param info
+ */
+ public AmqpQueueBrowser(AmqpSession session, JmsConsumerInfo info) {
+ super(session, info);
+ }
+
+ /**
+ * Starts the QueueBrowser by activating drain mode with the initial credits.
+ */
+ @Override
+ public void start(AsyncResult request) {
+ this.endpoint.flow(info.getPrefetchSize());
+ request.onSuccess();
+ }
+
+ /**
+ * QueueBrowser will attempt to initiate a pull whenever there are no pending Messages.
+ *
+ * We need to initiate a drain to see if there are any messages and if the remote sender
+ * indicates it is drained then we can send end of browse. We only do this when there
+ * are no pending incoming deliveries and all delivered messages have become settled
+ * in order to give the remote a chance to dispatch more messages once all deliveries
+ * have been settled.
+ *
+ * @param timeout
+ * ignored in this context.
+ */
+ @Override
+ public void pull(long timeout) {
+ if (!endpoint.getDrain() && endpoint.current() == null && endpoint.getUnsettled() == 0) {
+ LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId());
+ this.endpoint.drain(info.getPrefetchSize());
+ } else {
+ endpoint.setDrain(false);
+ }
+ }
+
+ @Override
+ public void processFlowUpdates() throws IOException {
+ if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit()) {
+ JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch();
+ browseDone.setConsumerId(getConsumerId());
+ try {
+ deliver(browseDone);
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ } else {
+ endpoint.setDrain(false);
+ }
+
+ super.processFlowUpdates();
+ }
+
+ @Override
+ public void processDeliveryUpdates() throws IOException {
+ if (endpoint.getDrain() && endpoint.current() != null) {
+ LOG.trace("{} incoming delivery, cancel drain.", getConsumerId());
+ endpoint.setDrain(false);
+ }
+
+ super.processDeliveryUpdates();
+
+ if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit()) {
+ JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch();
+ browseDone.setConsumerId(getConsumerId());
+ try {
+ deliver(browseDone);
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ } else {
+ endpoint.setDrain(false);
+ }
+ }
+
+ @Override
+ protected void configureSource(Source source) {
+ if (info.isBrowser()) {
+ source.setDistributionMode(COPY);
+ }
+
+ super.configureSource(source);
+ }
+
+ @Override
+ public boolean isBrowser() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
new file mode 100644
index 0000000..46f8130
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import org.apache.qpid.jms.provider.AsyncResult;
+
+/**
+ * AmqpResource specification.
+ *
+ * All AMQP types should implement this interface to allow for control of state
+ * and configuration details.
+ */
+public interface AmqpResource {
+
+ /**
+ * Perform all the work needed to open this resource and store the request
+ * until such time as the remote peer indicates the resource has become active.
+ *
+ * @param request
+ * The initiating request that triggered this open call.
+ */
+ void open(AsyncResult request);
+
+ /**
+ * @return if the resource has moved to the opened state on the remote.
+ */
+ boolean isOpen();
+
+ /**
+ * @return true if the resource is awaiting the remote end to signal opened.
+ */
+ boolean isAwaitingOpen();
+
+ /**
+ * Called to indicate that this resource is now remotely opened. Once opened a
+ * resource can start accepting incoming requests.
+ */
+ void opened();
+
+ /**
+ * Perform all work needed to close this resource and store the request
+ * until such time as the remote peer indicates the resource has been closed.
+ *
+ * @param request
+ * The initiating request that triggered this close call.
+ */
+ void close(AsyncResult request);
+
+ /**
+ * @return if the resource has moved to the closed state on the remote.
+ */
+ boolean isClosed();
+
+ /**
+ * @return true if the resource is awaiting the remote end to signal closed.
+ */
+ boolean isAwaitingClose();
+
+ /**
+ * Called to indicate that this resource is now remotely closed. Once closed a
+ * resource can not accept any incoming requests.
+ */
+ void closed();
+
+ /**
+ * Sets the failed state for this Resource and triggers a failure signal for
+ * any pending ProduverRequest.
+ */
+ void failed();
+
+ /**
+ * Sets the failed state for this Resource and triggers a failure signal for
+ * any pending ProduverRequest.
+ *
+ * @param cause
+ * The Exception that triggered the failure.
+ */
+ void failed(Exception cause);
+
+ /**
+ * Called when the Proton Engine signals that the state of the given resource has
+ * changed on the remote side.
+ *
+ * @throws IOException if an error occurs while processing the update.
+ */
+ void processStateChange() throws IOException;
+
+ /**
+ * Called when the Proton Engine signals an Delivery related event has been triggered
+ * for the given endpoint.
+ *
+ * @throws IOException if an error occurs while processing the update.
+ */
+ void processDeliveryUpdates() throws IOException;
+
+ /**
+ * Called when the Proton Engine signals an Flow related event has been triggered
+ * for the given endpoint.
+ *
+ * @throws IOException if an error occurs while processing the update.
+ */
+ void processFlowUpdates() throws IOException;
+
+ /**
+ * @return an Exception derived from the error state of the endpoint's Remote Condition.
+ */
+ Exception getRemoteError();
+
+ /**
+ * @return an Error message derived from the error state of the endpoint's Remote Condition.
+ */
+ String getRemoteErrorMessage();
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
new file mode 100644
index 0000000..3416c22
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import javax.jms.JMSSecurityException;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.sasl.Mechanism;
+import org.apache.qpid.jms.sasl.SaslMechanismFinder;
+import org.apache.qpid.proton.engine.Sasl;
+
+/**
+ * Manage the SASL authentication process
+ */
+public class AmqpSaslAuthenticator {
+
+ private final Sasl sasl;
+ private final JmsConnectionInfo info;
+ private Mechanism mechanism;
+
+ /**
+ * Create the authenticator and initialize it.
+ *
+ * @param sasl
+ * The Proton SASL entry point this class will use to manage the authentication.
+ * @param info
+ * The Connection information used to provide credentials to the remote peer.
+ */
+ public AmqpSaslAuthenticator(Sasl sasl, JmsConnectionInfo info) {
+ this.sasl = sasl;
+ this.info = info;
+ }
+
+ /**
+ * Process the SASL authentication cycle until such time as an outcome is determine. This
+ * method must be called by the managing entity until the return value is true indicating a
+ * successful authentication or a JMSSecurityException is thrown indicating that the
+ * handshake failed.
+ *
+ * @throws JMSSecurityException
+ */
+ public boolean authenticate() throws JMSSecurityException {
+ switch (sasl.getState()) {
+ case PN_SASL_IDLE:
+ handleSaslInit();
+ break;
+ case PN_SASL_STEP:
+ handleSaslStep();
+ break;
+ case PN_SASL_FAIL:
+ handleSaslFail();
+ break;
+ case PN_SASL_PASS:
+ return true;
+ default:
+ }
+
+ return false;
+ }
+
+ private void handleSaslInit() throws JMSSecurityException {
+ try {
+ String[] remoteMechanisms = sasl.getRemoteMechanisms();
+ if (remoteMechanisms != null && remoteMechanisms.length != 0) {
+ mechanism = SaslMechanismFinder.findMatchingMechanism(remoteMechanisms);
+ if (mechanism != null) {
+ mechanism.setUsername(info.getUsername());
+ mechanism.setPassword(info.getPassword());
+ // TODO - set additional options from URI.
+ // TODO - set a host value.
+
+ sasl.setMechanisms(mechanism.getName());
+ byte[] response = mechanism.getInitialResponse();
+ if (response != null && response.length != 0) {
+ sasl.send(response, 0, response.length);
+ }
+ } else {
+ // TODO - Better error message.
+ throw new JMSSecurityException("Could not find a matching SASL mechanism for the remote peer.");
+ }
+ }
+ } catch (SaslException se) {
+ // TODO - Better error message.
+ JMSSecurityException jmsse = new JMSSecurityException("Exception while processing SASL init.");
+ jmsse.setLinkedException(se);
+ jmsse.initCause(se);
+ throw jmsse;
+ }
+ }
+
+ private void handleSaslStep() throws JMSSecurityException {
+ try {
+ if (sasl.pending() != 0) {
+ byte[] challenge = new byte[sasl.pending()];
+ sasl.recv(challenge, 0, challenge.length);
+ byte[] response = mechanism.getChallengeResponse(challenge);
+ sasl.send(response, 0, response.length);
+ }
+ } catch (SaslException se) {
+ // TODO - Better error message.
+ JMSSecurityException jmsse = new JMSSecurityException("Exception while processing SASL step.");
+ jmsse.setLinkedException(se);
+ jmsse.initCause(se);
+ throw jmsse;
+ }
+ }
+
+ private void handleSaslFail() throws JMSSecurityException {
+ // TODO - Better error message.
+ throw new JMSSecurityException("Client failed to authenticate");
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
new file mode 100644
index 0000000..44e864a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.IllegalStateException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.proton.engine.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
+
+ private final AmqpConnection connection;
+ private final AmqpTransactionContext txContext;
+
+ private final Map<JmsConsumerId, AmqpConsumer> consumers = new HashMap<JmsConsumerId, AmqpConsumer>();
+ private final Map<JmsProducerId, AmqpProducer> producers = new HashMap<JmsProducerId, AmqpProducer>();
+
+ public AmqpSession(AmqpConnection connection, JmsSessionInfo info) {
+ super(info, connection.getProtonConnection().session());
+ this.connection = connection;
+
+ this.info.getSessionId().setProviderHint(this);
+ if (this.info.isTransacted()) {
+ txContext = new AmqpTransactionContext(this);
+ } else {
+ txContext = null;
+ }
+ }
+
+ @Override
+ public void opened() {
+ if (this.txContext != null) {
+ this.txContext.open(openRequest);
+ } else {
+ super.opened();
+ }
+ }
+
+ @Override
+ protected void doOpen() {
+ this.endpoint.setIncomingCapacity(Integer.MAX_VALUE);
+ this.connection.addSession(this);
+ }
+
+ @Override
+ protected void doClose() {
+ this.connection.removeSession(this);
+ }
+
+ /**
+ * Perform an acknowledge of all delivered messages for all consumers active in this
+ * Session.
+ */
+ public void acknowledge() {
+ for (AmqpConsumer consumer : consumers.values()) {
+ consumer.acknowledge();
+ }
+ }
+
+ /**
+ * Perform re-send of all delivered but not yet acknowledged messages for all consumers
+ * active in this Session.
+ *
+ * @throws Exception if an error occurs while performing the recover.
+ */
+ public void recover() throws Exception {
+ for (AmqpConsumer consumer : consumers.values()) {
+ consumer.recover();
+ }
+ }
+
+ public AmqpProducer createProducer(JmsProducerInfo producerInfo) {
+ AmqpProducer producer = null;
+
+ // TODO - There seems to be an issue with Proton not allowing links with a Target
+ // that has no address. Otherwise we could just ensure that messages sent
+ // to these anonymous targets have their 'to' value set to the destination.
+ if (producerInfo.getDestination() != null) {
+ LOG.debug("Creating fixed Producer for: {}", producerInfo.getDestination());
+ producer = new AmqpFixedProducer(this, producerInfo);
+ } else {
+ LOG.debug("Creating an Anonymous Producer: ");
+ producer = new AmqpAnonymousProducer(this, producerInfo);
+ }
+
+ producer.setPresettle(connection.isPresettleProducers());
+
+ return producer;
+ }
+
+ public AmqpProducer getProducer(JmsProducerInfo producerInfo) {
+ return getProducer(producerInfo.getProducerId());
+ }
+
+ public AmqpProducer getProducer(JmsProducerId producerId) {
+ if (producerId.getProviderHint() instanceof AmqpProducer) {
+ return (AmqpProducer) producerId.getProviderHint();
+ }
+ return this.producers.get(producerId);
+ }
+
+ public AmqpConsumer createConsumer(JmsConsumerInfo consumerInfo) {
+ AmqpConsumer result = null;
+
+ if (consumerInfo.isBrowser()) {
+ result = new AmqpQueueBrowser(this, consumerInfo);
+ } else {
+ result = new AmqpConsumer(this, consumerInfo);
+ }
+
+ result.setPresettle(connection.isPresettleConsumers());
+ return result;
+ }
+
+ public AmqpConsumer getConsumer(JmsConsumerInfo consumerInfo) {
+ return getConsumer(consumerInfo.getConsumerId());
+ }
+
+ public AmqpConsumer getConsumer(JmsConsumerId consumerId) {
+ if (consumerId.getProviderHint() instanceof AmqpConsumer) {
+ return (AmqpConsumer) consumerId.getProviderHint();
+ }
+ return this.consumers.get(consumerId);
+ }
+
+ public AmqpTransactionContext getTransactionContext() {
+ return this.txContext;
+ }
+
+ /**
+ * Begins a new Transaction using the given Transaction Id as the identifier. The AMQP
+ * binary Transaction Id will be stored in the provider hint value of the given transaction.
+ *
+ * @param txId
+ * The JMS Framework's assigned Transaction Id for the new TX.
+ * @param request
+ * The request that will be signaled on completion of this operation.
+ *
+ * @throws Exception if an error occurs while performing the operation.
+ */
+ public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
+ if (!this.info.isTransacted()) {
+ throw new IllegalStateException("Non-transacted Session cannot start a TX.");
+ }
+
+ getTransactionContext().begin(txId, request);
+ }
+
+ /**
+ * Commit the currently running Transaction.
+ *
+ * @param request
+ * The request that will be signaled on completion of this operation.
+ *
+ * @throws Exception if an error occurs while performing the operation.
+ */
+ public void commit(AsyncResult request) throws Exception {
+ if (!this.info.isTransacted()) {
+ throw new IllegalStateException("Non-transacted Session cannot start a TX.");
+ }
+
+ getTransactionContext().commit(request);
+ }
+
+ /**
+ * Roll back the currently running Transaction
+ *
+ * @param request
+ * The request that will be signaled on completion of this operation.
+ *
+ * @throws Exception if an error occurs while performing the operation.
+ */
+ public void rollback(AsyncResult request) throws Exception {
+ if (!this.info.isTransacted()) {
+ throw new IllegalStateException("Non-transacted Session cannot start a TX.");
+ }
+
+ getTransactionContext().rollback(request);
+ }
+
+ void addResource(AmqpConsumer consumer) {
+ consumers.put(consumer.getConsumerId(), consumer);
+ }
+
+ void removeResource(AmqpConsumer consumer) {
+ consumers.remove(consumer.getConsumerId());
+ }
+
+ void addResource(AmqpProducer producer) {
+ producers.put(producer.getProducerId(), producer);
+ }
+
+ void removeResource(AmqpProducer producer) {
+ producers.remove(producer.getProducerId());
+ }
+
+ /**
+ * Adds Topic or Queue qualifiers to the destination target. We don't add qualifiers to
+ * Temporary Topics and Queues since AMQP works a bit differently.
+ *
+ * @param destination
+ * The destination to Qualify.
+ *
+ * @return the qualified destination name.
+ */
+ public String getQualifiedName(JmsDestination destination) {
+ if (destination == null) {
+ return null;
+ }
+
+ String result = destination.getName();
+
+ if (!destination.isTemporary()) {
+ if (destination.isTopic()) {
+ result = connection.getTopicPrefix() + destination.getName();
+ } else {
+ result = connection.getQueuePrefix() + destination.getName();
+ }
+ }
+
+ return result;
+ }
+
+ public AmqpProvider getProvider() {
+ return this.connection.getProvider();
+ }
+
+ public AmqpConnection getConnection() {
+ return this.connection;
+ }
+
+ public JmsSessionId getSessionId() {
+ return this.info.getSessionId();
+ }
+
+ public Session getProtonSession() {
+ return this.endpoint;
+ }
+
+ boolean isTransacted() {
+ return this.info.isTransacted();
+ }
+
+ boolean isAsyncAck() {
+ return this.info.isSendAcksAsync() || isTransacted();
+ }
+
+ @Override
+ public String toString() {
+ return "AmqpSession { " + getSessionId() + " }";
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
new file mode 100644
index 0000000..af7fe7f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.apache.qpid.jms.transports.SslTransport;
+import org.apache.qpid.jms.transports.Transport;
+
+/**
+ * AmqpProvider extension that enables SSL based transports.
+ */
+public class AmqpSslProvider extends AmqpProvider {
+
+ private final JmsSslContext sslContext;
+
+ public AmqpSslProvider(URI remoteURI) {
+ super(remoteURI);
+ this.sslContext = JmsSslContext.getCurrentSslContext();
+ }
+
+ public AmqpSslProvider(URI remoteURI, Map<String, String> extraOptions) {
+ super(remoteURI, extraOptions);
+ this.sslContext = JmsSslContext.getCurrentSslContext();
+ }
+
+ @Override
+ protected Transport createTransport(URI remoteLocation) {
+ return new SslTransport(this, remoteLocation, sslContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
new file mode 100644
index 0000000..7606792
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.provider.Provider;
+
+/**
+ * Extends the AmqpProviderFactory to create an SSL based Provider instance.
+ */
+public class AmqpSslProviderFactory extends AmqpProviderFactory {
+
+ @Override
+ public Provider createAsyncProvider(URI remoteURI) throws Exception {
+ return new AmqpSslProvider(remoteURI);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
new file mode 100644
index 0000000..d76e3d7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages a Temporary Destination linked to a given Connection.
+ *
+ * In order to create a temporary destination and keep it active for the life of the connection
+ * we must create a sender with a dynamic target value. Once the sender is open we can read
+ * the actual name assigned by the broker from the target and that is the real temporary
+ * destination that we will return.
+ *
+ * The open of the Sender instance will also allow us to catch any security errors from
+ * the broker in the case where the user does not have authorization to access temporary
+ * destinations.
+ */
+public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestination, Sender> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpTemporaryDestination.class);
+
+ private final AmqpConnection connection;
+ private final AmqpSession session;
+
+ public AmqpTemporaryDestination(AmqpSession session, JmsDestination destination) {
+ super(destination);
+ this.session = session;
+ this.connection = session.getConnection();
+ }
+
+ @Override
+ public void processStateChange() {
+ // TODO - We might want to check on our producer to see if it becomes closed
+ // which might indicate that the broker purged the temporary destination.
+
+ EndpointState remoteState = endpoint.getRemoteState();
+ if (remoteState == EndpointState.ACTIVE) {
+ LOG.trace("Temporary Destination: {} is now open", this.info);
+ opened();
+ } else if (remoteState == EndpointState.CLOSED) {
+ LOG.trace("Temporary Destination: {} is now closed", this.info);
+ closed();
+ }
+ }
+
+ @Override
+ public void opened() {
+
+ // Once our producer is opened we can read the updated name from the target address.
+ String oldDestinationName = info.getName();
+ String destinationName = this.endpoint.getRemoteTarget().getAddress();
+
+ this.info.setName(destinationName);
+
+ LOG.trace("Updated temp destination to: {} from: {}", info, oldDestinationName);
+
+ super.opened();
+ }
+
+ @Override
+ protected void doOpen() {
+
+ String sourceAddress = info.getName();
+ if (info.isQueue()) {
+ sourceAddress = connection.getTempQueuePrefix() + sourceAddress;
+ } else {
+ sourceAddress = connection.getTempQueuePrefix() + sourceAddress;
+ }
+ Source source = new Source();
+ source.setAddress(sourceAddress);
+ Target target = new Target();
+ target.setDynamic(true);
+
+ String senderName = sourceAddress;
+ endpoint = session.getProtonSession().sender(senderName);
+ endpoint.setSource(source);
+ endpoint.setTarget(target);
+ endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ this.connection.addTemporaryDestination(this);
+ }
+
+ @Override
+ protected void doClose() {
+ this.connection.removeTemporaryDestination(this);
+ }
+
+ public AmqpConnection getConnection() {
+ return this.connection;
+ }
+
+ public AmqpSession getSession() {
+ return this.session;
+ }
+
+ public Sender getProtonSender() {
+ return this.endpoint;
+ }
+
+ public JmsDestination getJmsDestination() {
+ return this.info;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " { " + info + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
new file mode 100644
index 0000000..1cc6fd1
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import javax.jms.IllegalStateException;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the operations surrounding AMQP transaction control.
+ *
+ * The Transaction will carry a JmsTransactionId while the Transaction is open, once a
+ * transaction has been committed or rolled back the Transaction Id is cleared.
+ */
+public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo, Sender> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
+
+ private static final Boolean ROLLBACK_MARKER = Boolean.FALSE;
+ private static final Boolean COMMIT_MARKER = Boolean.TRUE;
+
+ private final AmqpSession session;
+ private JmsTransactionId current;
+ private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
+ private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>();
+
+ private Delivery pendingDelivery;
+ private AsyncResult pendingRequest;
+
+ /**
+ * Creates a new AmqpTransaction instance.
+ *
+ * @param session
+ * The session that owns this transaction
+ * @param info
+ * The JmsTransactionInfo that defines this Transaction.
+ */
+ public AmqpTransactionContext(AmqpSession session) {
+ super(session.getJmsResource());
+ this.session = session;
+ }
+
+ @Override
+ public void processDeliveryUpdates() throws IOException {
+ try {
+ if (pendingDelivery != null && pendingDelivery.remotelySettled()) {
+ DeliveryState state = pendingDelivery.getRemoteState();
+ if (state instanceof Declared) {
+ Declared declared = (Declared) state;
+ current.setProviderHint(declared.getTxnId());
+ pendingDelivery.settle();
+ LOG.info("New TX started: {}", current.getProviderHint());
+ AsyncResult request = this.pendingRequest;
+ this.pendingRequest = null;
+ this.pendingDelivery = null;
+ request.onSuccess();
+ } else if (state instanceof Rejected) {
+ LOG.info("Last TX request failed: {}", current.getProviderHint());
+ pendingDelivery.settle();
+ Rejected rejected = (Rejected) state;
+ TransactionRolledBackException ex =
+ new TransactionRolledBackException(rejected.getError().getDescription());
+ AsyncResult request = this.pendingRequest;
+ this.current = null;
+ this.pendingRequest = null;
+ this.pendingDelivery = null;
+ postRollback();
+ request.onFailure(ex);
+ } else {
+ LOG.info("Last TX request succeeded: {}", current.getProviderHint());
+ pendingDelivery.settle();
+ AsyncResult request = this.pendingRequest;
+ if (pendingDelivery.getContext() != null) {
+ if (pendingDelivery.getContext().equals(COMMIT_MARKER)) {
+ postCommit();
+ } else {
+ postRollback();
+ }
+ }
+ this.current = null;
+ this.pendingRequest = null;
+ this.pendingDelivery = null;
+ request.onSuccess();
+ }
+ }
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ @Override
+ protected void doOpen() {
+ Coordinator coordinator = new Coordinator();
+ coordinator.setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
+ Source source = new Source();
+
+ String coordinatorName = info.getSessionId().toString();
+ endpoint = session.getProtonSession().sender(coordinatorName);
+ endpoint.setSource(source);
+ endpoint.setTarget(coordinator);
+ endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ }
+
+ @Override
+ protected void doClose() {
+ }
+
+ public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
+ if (current != null) {
+ throw new IOException("Begin called while a TX is still Active.");
+ }
+
+ Message message = Message.Factory.create();
+ Declare declare = new Declare();
+ message.setBody(new AmqpValue(declare));
+
+ pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+ pendingRequest = request;
+ current = txId;
+
+ sendTxCommand(message);
+ }
+
+ public void commit(AsyncResult request) throws Exception {
+ if (current == null) {
+ throw new IllegalStateException("Rollback called with no active Transaction.");
+ }
+
+ preCommit();
+
+ Message message = Message.Factory.create();
+ Discharge discharge = new Discharge();
+ discharge.setFail(false);
+ discharge.setTxnId((Binary) current.getProviderHint());
+ message.setBody(new AmqpValue(discharge));
+
+ pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+ pendingDelivery.setContext(COMMIT_MARKER);
+ pendingRequest = request;
+
+ sendTxCommand(message);
+ }
+
+ public void rollback(AsyncResult request) throws Exception {
+ if (current == null) {
+ throw new IllegalStateException("Rollback called with no active Transaction.");
+ }
+
+ preRollback();
+
+ Message message = Message.Factory.create();
+ Discharge discharge = new Discharge();
+ discharge.setFail(true);
+ discharge.setTxnId((Binary) current.getProviderHint());
+ message.setBody(new AmqpValue(discharge));
+
+ pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+ pendingDelivery.setContext(ROLLBACK_MARKER);
+ pendingRequest = request;
+
+ sendTxCommand(message);
+ }
+
+ public void registerTxConsumer(AmqpConsumer consumer) {
+ this.txConsumers.add(consumer);
+ }
+
+ public AmqpSession getSession() {
+ return this.session;
+ }
+
+ public JmsTransactionId getTransactionId() {
+ return this.current;
+ }
+
+ public Binary getAmqpTransactionId() {
+ Binary result = null;
+ if (current != null) {
+ result = (Binary) current.getProviderHint();
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return this.session.getSessionId() + ": txContext";
+ }
+
+ private void preCommit() throws Exception {
+ for (AmqpConsumer consumer : txConsumers) {
+ consumer.preCommit();
+ }
+ }
+
+ private void preRollback() throws Exception {
+ for (AmqpConsumer consumer : txConsumers) {
+ consumer.preRollback();
+ }
+ }
+
+ private void postCommit() throws Exception {
+ for (AmqpConsumer consumer : txConsumers) {
+ consumer.postCommit();
+ }
+ }
+
+ private void postRollback() throws Exception {
+ for (AmqpConsumer consumer : txConsumers) {
+ consumer.postRollback();
+ }
+ }
+
+ private void sendTxCommand(Message message) throws IOException {
+ int encodedSize = 0;
+ byte[] buffer = new byte[4 * 1024];
+ while (true) {
+ try {
+ encodedSize = message.encode(buffer, 0, buffer.length);
+ break;
+ } catch (BufferOverflowException e) {
+ buffer = new byte[buffer.length * 2];
+ }
+ }
+
+ this.endpoint.send(buffer, 0, encodedSize);
+ this.endpoint.advance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransferTagGenerator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransferTagGenerator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransferTagGenerator.java
new file mode 100644
index 0000000..309561c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransferTagGenerator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * Utility class that can generate and if enabled pool the binary tag values
+ * used to identify transfers over an AMQP link.
+ */
+public final class AmqpTransferTagGenerator {
+
+ public static final int DEFAULT_TAG_POOL_SIZE = 1024;
+
+ private long nextTagId;
+ private int maxPoolSize = DEFAULT_TAG_POOL_SIZE;
+
+ private final Set<byte[]> tagPool;
+
+ public AmqpTransferTagGenerator() {
+ this(false);
+ }
+
+ public AmqpTransferTagGenerator(boolean pool) {
+ if (pool) {
+ this.tagPool = new LinkedHashSet<byte[]>();
+ } else {
+ this.tagPool = null;
+ }
+ }
+
+ /**
+ * Retrieves the next available tag.
+ *
+ * @return a new or unused tag depending on the pool option.
+ */
+ public byte[] getNextTag() {
+ byte[] rc;
+ if (tagPool != null && !tagPool.isEmpty()) {
+ final Iterator<byte[]> iterator = tagPool.iterator();
+ rc = iterator.next();
+ iterator.remove();
+ } else {
+ try {
+ rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // This should never happen since we control the input.
+ throw new RuntimeException(e);
+ }
+ }
+ return rc;
+ }
+
+ /**
+ * When used as a pooled cache of tags the unused tags should always be returned once
+ * the transfer has been settled.
+ *
+ * @param tag
+ * a previously borrowed tag that is no longer in use.
+ */
+ public void returnTag(byte[] data) {
+ if (tagPool != null && tagPool.size() < maxPoolSize) {
+ tagPool.add(data);
+ }
+ }
+
+ /**
+ * Gets the current max pool size value.
+ *
+ * @return the current max tag pool size.
+ */
+ public int getMaxPoolSize() {
+ return maxPoolSize;
+ }
+
+ /**
+ * Sets the max tag pool size. If the size is smaller than the current number
+ * of pooled tags the pool will drain over time until it matches the max.
+ *
+ * @param maxPoolSize
+ * the maximum number of tags to hold in the pool.
+ */
+ public void setMaxPoolSize(int maxPoolSize) {
+ this.maxPoolSize = maxPoolSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
new file mode 100644
index 0000000..1cefc2a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_BYTES_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+
+import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * A JmsBytesMessageFacade that wraps around Proton AMQP Message instances to provide
+ * access to the underlying bytes contained in the message.
+ */
+public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements JmsBytesMessageFacade {
+
+ private static final String CONTENT_TYPE = "application/octet-stream";
+ private static final Buffer EMPTY_BUFFER = new Buffer(new byte[0]);
+ private static final Data EMPTY_DATA = new Data(new Binary(new byte[0]));
+
+ /**
+ * Creates a new facade instance
+ *
+ * @param connection
+ */
+ public AmqpJmsBytesMessageFacade(AmqpConnection connection) {
+ super(connection);
+ setAnnotation(JMS_MSG_TYPE, JMS_BYTES_MESSAGE);
+ }
+
+ /**
+ * Creates a new Facade around an incoming AMQP Message for dispatch to the
+ * JMS Consumer instance.
+ *
+ * @param connection
+ * the connection that created this Facade.
+ * @param message
+ * the incoming Message instance that is being wrapped.
+ */
+ public AmqpJmsBytesMessageFacade(AmqpConnection connection, Message message) {
+ super(connection, message);
+ }
+
+ @Override
+ public JmsBytesMessageFacade copy() {
+ AmqpJmsBytesMessageFacade copy = new AmqpJmsBytesMessageFacade(connection);
+ copyInto(copy);
+
+ copy.setContent(getContent().deepCopy());
+
+ return copy;
+ }
+
+ @Override
+ public byte getJmsMsgType() {
+ return JMS_BYTES_MESSAGE;
+ }
+
+ @Override
+ public String getContentType() {
+ return CONTENT_TYPE;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ Binary payload = getBinaryFromBody();
+ return payload != null && payload.getLength() > 0;
+ }
+
+ @Override
+ public Buffer getContent() {
+ Buffer result = EMPTY_BUFFER;
+ Binary payload = getBinaryFromBody();
+ if (payload != null && payload.getLength() > 0) {
+ result = new Buffer(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ }
+
+ return result;
+ }
+
+ @Override
+ public void setContent(Buffer content) {
+ Data body = EMPTY_DATA;
+ if (content != null) {
+ body = new Data(new Binary(content.data, content.offset, content.length));
+ }
+
+ getAmqpMessage().setBody(body);
+ }
+
+ private Binary getBinaryFromBody() {
+ Section body = getAmqpMessage().getBody();
+ Binary result = null;
+
+ if (body == null) {
+ return result;
+ }
+
+ if (body instanceof Data) {
+ Binary payload = ((Data) body).getValue();
+ if (payload != null && payload.getLength() != 0) {
+ result = payload;
+ }
+ } else if(body instanceof AmqpValue) {
+ Object value = ((AmqpValue) body).getValue();
+ if (value == null) {
+ return result;
+ }
+
+ if (value instanceof Binary) {
+ Binary payload = (Binary)value;
+ if (payload != null && payload.getLength() != 0) {
+ result = payload;
+ }
+ } else {
+ throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
+ }
+ } else {
+ throw new IllegalStateException("Unexpected body content type: " + body.getClass().getSimpleName());
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
new file mode 100644
index 0000000..511d1e3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MAP_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.qpid.jms.message.facade.JmsMapMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS MapMessage
+ * type.
+ */
+public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements JmsMapMessageFacade {
+
+ private Map<String,Object> messageBodyMap;
+
+ /**
+ * Create a new facade ready for sending.
+ *
+ * @param connection
+ * the connection instance that created this facade.
+ */
+ public AmqpJmsMapMessageFacade(AmqpConnection connection) {
+ super(connection);
+ initializeEmptyBody();
+ setAnnotation(JMS_MSG_TYPE, JMS_MAP_MESSAGE);
+ }
+
+ /**
+ * Creates a new Facade around an incoming AMQP Message for dispatch to the
+ * JMS Consumer instance.
+ *
+ * @param connection
+ * the connection that created this Facade.
+ * @param message
+ * the incoming Message instance that is being wrapped.
+ */
+ @SuppressWarnings("unchecked")
+ public AmqpJmsMapMessageFacade(AmqpConnection connection, Message message) {
+ super(connection, message);
+
+ Section body = getAmqpMessage().getBody();
+ if (body == null) {
+ initializeEmptyBody();
+ } else if (body instanceof AmqpValue) {
+ Object o = ((AmqpValue) body).getValue();
+ if (o == null) {
+ initializeEmptyBody();
+ } else if (o instanceof Map) {
+ messageBodyMap = (Map<String, Object>) o;
+ } else {
+ throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+ }
+ } else {
+ throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+ }
+ }
+
+ /**
+ * @return the appropriate byte value that indicates the type of message this is.
+ */
+ @Override
+ public byte getJmsMsgType() {
+ return JMS_MAP_MESSAGE;
+ }
+
+ @Override
+ public JmsMapMessageFacade copy() {
+ AmqpJmsMapMessageFacade copy = new AmqpJmsMapMessageFacade(connection);
+ copyInto(copy);
+ copy.messageBodyMap.putAll(messageBodyMap);
+ return copy;
+ }
+
+ @Override
+ public Enumeration<String> getMapNames() {
+ return Collections.enumeration(messageBodyMap.keySet());
+ }
+
+ @Override
+ public boolean itemExists(String key) {
+ return messageBodyMap.containsKey(key);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return messageBodyMap.isEmpty();
+ }
+
+ @Override
+ public Object get(String key) {
+ Object value = messageBodyMap.get(key);
+ if (value instanceof Binary) {
+ // Copy to a byte[], ensure we copy only the required portion.
+ Binary bin = ((Binary) value);
+ value = Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+ }
+
+ return value;
+ }
+
+ @Override
+ public void put(String key, Object value) {
+ Object entry = value;
+ if (value instanceof byte[]) {
+ entry = new Binary((byte[]) value);
+ }
+
+ messageBodyMap.put(key, entry);
+ }
+
+ @Override
+ public Object remove(String key) {
+ return messageBodyMap.remove(key);
+ }
+
+ @Override
+ public void clearBody() {
+ messageBodyMap.clear();
+ }
+
+ private void initializeEmptyBody() {
+ // Using LinkedHashMap because AMQP map equality considers order,
+ // so we should behave in as predictable a manner as possible
+ messageBodyMap = new LinkedHashMap<String, Object>();
+ getAmqpMessage().setBody(new AmqpValue(messageBodyMap));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
new file mode 100644
index 0000000..c31c50b
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_BYTES_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MAP_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_OBJECT_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_STREAM_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.qpid.jms.message.JmsBytesMessage;
+import org.apache.qpid.jms.message.JmsMapMessage;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsObjectMessage;
+import org.apache.qpid.jms.message.JmsStreamMessage;
+import org.apache.qpid.jms.message.JmsTextMessage;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Builder class used to construct the appropriate JmsMessage / JmsMessageFacade
+ * objects to wrap an incoming AMQP Message.
+ */
+public class AmqpJmsMessageBuilder {
+
+ private AmqpJmsMessageBuilder() {
+ }
+
+ /**
+ * Create a new JmsMessage and underlying JmsMessageFacade that represents the proper
+ * message type for the incoming AMQP message.
+ *
+ * @param connection
+ * The provider AMQP Connection instance where this message arrived at.
+ * @param message
+ * The Proton Message object that will be wrapped.
+ *
+ * @return a JmsMessage instance properly configured for dispatch to the provider listener.
+ *
+ * @throws IOException if an error occurs while creating the message objects.
+ */
+ public static JmsMessage createJmsMessage(AmqpConnection connection, Message message) throws IOException {
+
+ // First we try the easy way, if the annotation is there we don't have to work hard.
+ JmsMessage result = createFromMsgAnnotation(connection, message);
+ if (result != null) {
+ return result;
+ }
+
+ // TODO
+ throw new IOException("Could not create a JMS message from incoming message");
+ }
+
+ private static JmsMessage createFromMsgAnnotation(AmqpConnection connection, Message message) throws IOException {
+ Object annotation = getMessageAnnotation(JMS_MSG_TYPE, message);
+ if (annotation != null) {
+
+ switch ((byte) annotation) {
+ case JMS_MESSAGE:
+ return new JmsMessage(new AmqpJmsMessageFacade(connection, message));
+ case JMS_BYTES_MESSAGE:
+ return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection, message));
+ case JMS_TEXT_MESSAGE:
+ return new JmsTextMessage(new AmqpJmsTextMessageFacade(connection, message));
+ case JMS_MAP_MESSAGE:
+ return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection, message));
+ case JMS_STREAM_MESSAGE:
+ return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection, message));
+ case JMS_OBJECT_MESSAGE:
+ return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(connection, message));
+ default:
+ throw new IOException("Invalid JMS Message Type annotation found in message");
+ }
+ }
+
+ return null;
+ }
+
+ private static Object getMessageAnnotation(String key, Message message) {
+ if (message.getMessageAnnotations() != null) {
+ Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+ return annotations.get(AmqpMessageSupport.getSymbol(key));
+ }
+
+ return null;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org