You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/27 13:54:36 UTC
[09/15] activemq-artemis git commit: ARTEMIS-751 Simplification of
the AMQP implementation
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
new file mode 100644
index 0000000..848f0d2
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+import java.util.AbstractMap;
+import java.util.Map;
+
+/**
+ * Set of useful methods and definitions used in the AMQP protocol handling
+ */
+public class AmqpSupport {
+
+ // Identification values used to locating JMS selector types.
+ public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
+ public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
+ public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[]{JMS_SELECTOR_CODE, JMS_SELECTOR_NAME};
+ public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
+ public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:no-local-filter:list");
+ public static final Object[] NO_LOCAL_FILTER_IDS = new Object[]{NO_LOCAL_CODE, NO_LOCAL_NAME};
+
+ // Capabilities used to identify destination type in some requests.
+ public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
+ public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
+
+ // Symbols used to announce connection information to remote peer.
+ public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
+ public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
+
+ // Symbols used to announce connection information to remote peer.
+ public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+ public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
+ public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+ public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+ public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
+ public static final Symbol PRODUCT = Symbol.valueOf("product");
+ public static final Symbol VERSION = Symbol.valueOf("version");
+ public static final Symbol PLATFORM = Symbol.valueOf("platform");
+
+ // Symbols used in configuration of newly opened links.
+ public static final Symbol COPY = Symbol.getSymbol("copy");
+
+ // Lifetime policy symbols
+ public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+
+ public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
+ /**
+ * Search for a given Symbol in a given array of Symbol object.
+ *
+ * @param symbols
+ * the set of Symbols to search.
+ * @param key
+ * the value to try and find in the Symbol array.
+ *
+ * @return true if the key is found in the given Symbol array.
+ */
+ public static boolean contains(Symbol[] symbols, Symbol key) {
+ if (symbols == null || symbols.length == 0) {
+ return false;
+ }
+
+ for (Symbol symbol : symbols) {
+ if (symbol.equals(key)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Search for a particular filter using a set of known indentification values
+ * in the Map of filters.
+ *
+ * @param filters
+ * The filters map that should be searched.
+ * @param filterIds
+ * The aliases for the target filter to be located.
+ *
+ * @return the filter if found in the mapping or null if not found.
+ */
+ public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
+
+ if (filterIds == null || filterIds.length == 0) {
+ StringBuilder ids = new StringBuilder();
+ if (filterIds != null) {
+ for (Object filterId : filterIds) {
+ ids.append(filterId).append(" ");
+ }
+ }
+ throw new IllegalArgumentException("Invalid Filter Ids array passed: " + ids);
+ }
+
+ if (filters == null || filters.isEmpty()) {
+ return null;
+ }
+
+ for (Map.Entry<Symbol, Object> filter : filters.entrySet()) {
+ if (filter.getValue() instanceof DescribedType) {
+ DescribedType describedType = ((DescribedType) filter.getValue());
+ Object descriptor = describedType.getDescriptor();
+
+ for (Object filterId : filterIds) {
+ if (descriptor.equals(filterId)) {
+ return new AbstractMap.SimpleImmutableEntry<>(filter.getKey(), describedType);
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonDeliveryHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonDeliveryHandler.java
new file mode 100644
index 0000000..43f1913
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonDeliveryHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * An interface to handle deliveries, either messages, acks or transaction calls
+ */
+public interface ProtonDeliveryHandler {
+
+ void onFlow(int currentCredits, boolean drain);
+
+ void onMessage(Delivery delivery) throws ActiveMQAMQPException;
+
+ /*
+ * we have to distinguish between a remote close on the link and a close via a connection or session as the latter mean
+ * that a link reattach can happen and we need to keep the underlying resource (queue/subscription) around for pub subs
+ * */
+ void close(boolean remoteLinkClose) throws ActiveMQAMQPException;
+
+ void close(ErrorCondition condition) throws ActiveMQAMQPException;
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonInitializable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonInitializable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonInitializable.java
new file mode 100644
index 0000000..3870810
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonInitializable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+public class ProtonInitializable {
+
+ private boolean initialized = false;
+
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ public void initialise() throws Exception {
+ if (!initialized) {
+ initialized = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
new file mode 100644
index 0000000..4b97831
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
+import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.jboss.logging.Logger;
+
+public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler {
+
+ private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
+
+ protected final AMQPConnectionContext connection;
+
+ protected final AMQPSessionContext protonSession;
+
+ protected final Receiver receiver;
+
+ protected String address;
+
+ protected final AMQPSessionCallback sessionSPI;
+
+
+ /*
+ The maximum number of credits we will allocate to clients.
+ This number is also used by the broker when refresh client credits.
+ */
+ private static int maxCreditAllocation = 100;
+
+ // Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
+ private static int minCreditRefresh = 30;
+
+ public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
+ AMQPConnectionContext connection,
+ AMQPSessionContext protonSession,
+ Receiver receiver) {
+ this.connection = connection;
+ this.protonSession = protonSession;
+ this.receiver = receiver;
+ this.sessionSPI = sessionSPI;
+ }
+
+ @Override
+ public void onFlow(int credits, boolean drain) {
+ flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation);
+ }
+
+ @Override
+ public void initialise() throws Exception {
+ super.initialise();
+ org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
+
+ if (target != null) {
+ if (target.getDynamic()) {
+ //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
+ // will be deleted on closing of the session
+ address = sessionSPI.tempQueueName();
+
+ try {
+ sessionSPI.createTemporaryQueue(address);
+ }
+ catch (Exception e) {
+ throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+ }
+ target.setAddress(address);
+ }
+ else {
+ //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
+ //be a queue bound to it so we nee to check this.
+ address = target.getAddress();
+ if (address == null) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet();
+ }
+
+ try {
+ if (!sessionSPI.bindingQuery(address)) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
+ }
+ }
+ catch (ActiveMQAMQPNotFoundException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+ }
+ }
+ }
+ flow(maxCreditAllocation, minCreditRefresh);
+ }
+
+ /*
+ * called when Proton receives a message to be delivered via a Delivery.
+ *
+ * This may be called more than once per deliver so we have to cache the buffer until we have received it all.
+ *
+ * */
+ @Override
+ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
+ Receiver receiver;
+ try {
+ receiver = ((Receiver) delivery.getLink());
+
+ if (!delivery.isReadable()) {
+ return;
+ }
+
+ if (delivery.isPartial()) {
+ return;
+ }
+
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024);
+ try {
+ synchronized (connection.getLock()) {
+ DeliveryUtil.readDelivery(receiver, buffer);
+
+ receiver.advance();
+
+ Transaction tx = null;
+ if (delivery.getRemoteState() instanceof TransactionalState) {
+
+ TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+ tx = this.sessionSPI.getTransaction(txState.getTxnId());
+ }
+ sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer);
+
+ flow(maxCreditAllocation, minCreditRefresh);
+ }
+ }
+ finally {
+ buffer.release();
+ }
+ }
+ catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ Rejected rejected = new Rejected();
+ ErrorCondition condition = new ErrorCondition();
+ condition.setCondition(Symbol.valueOf("failed"));
+ condition.setDescription(e.getMessage());
+ rejected.setError(condition);
+ delivery.disposition(rejected);
+ }
+ }
+
+ @Override
+ public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
+ protonSession.removeReceiver(receiver);
+ }
+
+ @Override
+ public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+ receiver.setCondition(condition);
+ close(false);
+ }
+
+ public void flow(int credits, int threshold) {
+ // Use the SessionSPI to allocate producer credits, or default, always allocate credit.
+ if (sessionSPI != null) {
+ sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
+ }
+ else {
+ synchronized (connection.getLock()) {
+ receiver.flow(credits);
+ connection.flush();
+ }
+ }
+
+ }
+
+ public void drain(int credits) {
+ synchronized (connection.getLock()) {
+ receiver.drain(credits);
+ }
+ connection.flush();
+ }
+
+ public int drained() {
+ return receiver.drained();
+ }
+
+ public boolean isDraining() {
+ return receiver.draining();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
new file mode 100644
index 0000000..0a071fd
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import java.util.Map;
+import java.util.Objects;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.selector.filter.FilterException;
+import org.apache.activemq.artemis.selector.impl.SelectorParser;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.jboss.logging.Logger;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+
+public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
+
+ private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
+
+ private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
+ private static final Symbol COPY = Symbol.valueOf("copy");
+ private static final Symbol TOPIC = Symbol.valueOf("topic");
+
+ private Object brokerConsumer;
+
+ protected final AMQPSessionContext protonSession;
+ protected final Sender sender;
+ protected final AMQPConnectionContext connection;
+ protected boolean closed = false;
+ protected final AMQPSessionCallback sessionSPI;
+ protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
+
+
+ public ProtonServerSenderContext(AMQPConnectionContext connection,
+ Sender sender,
+ AMQPSessionContext protonSession,
+ AMQPSessionCallback server) {
+ super();
+ this.connection = connection;
+ this.sender = sender;
+ this.protonSession = protonSession;
+ this.sessionSPI = server;
+ }
+
+ public Object getBrokerConsumer() {
+ return brokerConsumer;
+ }
+
+ @Override
+ public void onFlow(int currentCredits, boolean drain) {
+ this.creditsSemaphore.setCredits(currentCredits);
+ sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
+ }
+
+ public Sender getSender() {
+ return sender;
+ }
+
+ /*
+* start the session
+* */
+ public void start() throws ActiveMQAMQPException {
+ sessionSPI.start();
+ // protonSession.getServerSession().start();
+
+ //todo add flow control
+ try {
+ // to do whatever you need to make the broker start sending messages to the consumer
+ //this could be null if a link reattach has happened
+ if (brokerConsumer != null) {
+ sessionSPI.startSender(brokerConsumer);
+ }
+ //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
+ }
+ }
+
+ /**
+ * create the actual underlying ActiveMQ Artemis Server Consumer
+ */
+ @Override
+ public void initialise() throws Exception {
+ super.initialise();
+
+ Source source = (Source) sender.getRemoteSource();
+
+ String queue;
+
+ String selector = null;
+
+ /*
+ * even tho the filter is a map it will only return a single filter unless a nolocal is also provided
+ * */
+ if (source != null) {
+ Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
+ if (filter != null) {
+ selector = filter.getValue().getDescribed().toString();
+ // Validate the Selector.
+ try {
+ SelectorParser.parse(selector);
+ }
+ catch (FilterException e) {
+ close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
+ return;
+ }
+ }
+ }
+
+ /*
+ * if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act
+ * like a subscription.
+ * */
+ boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
+
+ if (isPubSub) {
+ if (AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
+ String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
+ String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+ if (selector != null) {
+ selector += " AND " + noLocalFilter;
+ }
+ else {
+ selector = noLocalFilter;
+ }
+ }
+ }
+
+ if (source == null) {
+ // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
+ String clientId = connection.getRemoteContainer();
+ String pubId = sender.getName();
+ queue = clientId + ":" + pubId;
+ boolean exists = sessionSPI.queueQuery(queue, false).isExists();
+
+ /*
+ * If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a
+ * link remote close.
+ * */
+ if (exists) {
+ source = new org.apache.qpid.proton.amqp.messaging.Source();
+ source.setAddress(queue);
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ source.setDistributionMode(COPY);
+ source.setCapabilities(TOPIC);
+ sender.setSource(source);
+ }
+ else {
+ throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
+ }
+ }
+ else {
+ if (source.getDynamic()) {
+ //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
+ // will be deleted on closing of the session
+ queue = java.util.UUID.randomUUID().toString();
+ try {
+ sessionSPI.createTemporaryQueue(queue);
+ //protonSession.getServerSession().createQueue(queue, queue, null, true, false);
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+ }
+ source.setAddress(queue);
+ }
+ else {
+ //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
+ //be a queue bound to it so we nee to check this.
+ if (isPubSub) {
+ // if we are a subscription and durable create a durable queue using the container id and link name
+ if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
+ TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
+ String clientId = connection.getRemoteContainer();
+ String pubId = sender.getName();
+ queue = clientId + ":" + pubId;
+ QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+
+ if (result.isExists()) {
+ // If a client reattaches to a durable subscription with a different no-local filter value, selector
+ // or address then we must recreate the queue (JMS semantics).
+
+ if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
+ (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
+ if (result.getConsumerCount() == 0) {
+ sessionSPI.deleteQueue(queue);
+ sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+ }
+ else {
+ throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
+ }
+ }
+ }
+ else {
+ sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+ }
+ source.setAddress(queue);
+ }
+ //otherwise we are a volatile subscription
+ else {
+ queue = java.util.UUID.randomUUID().toString();
+ try {
+ sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+ }
+ source.setAddress(queue);
+ }
+ }
+ else {
+ queue = source.getAddress();
+ }
+ if (queue == null) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
+ }
+
+ try {
+ if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+ }
+ }
+ catch (ActiveMQAMQPNotFoundException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+ }
+ }
+
+ boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
+ try {
+ brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
+ }
+ }
+ }
+
+ private boolean isPubSub(Source source) {
+ String pubSubPrefix = sessionSPI.getPubSubPrefix();
+ return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
+ }
+
+ /*
+ * close the session
+ * */
+ @Override
+ public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+ closed = true;
+ protonSession.removeSender(sender);
+ synchronized (connection.getLock()) {
+ sender.close();
+ }
+ connection.flush();
+
+ try {
+ sessionSPI.closeSender(brokerConsumer);
+ }
+ catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ throw new ActiveMQAMQPInternalErrorException(e.getMessage());
+ }
+ }
+
+ /*
+ * close the session
+ * */
+ @Override
+ public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
+ try {
+ sessionSPI.closeSender(brokerConsumer);
+ //if this is a link close rather than a connection close or detach, we need to delete any durable resources for
+ // say pub subs
+ if (remoteLinkClose) {
+ Source source = (Source) sender.getSource();
+ if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
+ String queueName = source.getAddress();
+ QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
+ if (result.isExists() && source.getDynamic()) {
+ sessionSPI.deleteQueue(queueName);
+ }
+ else {
+ String clientId = connection.getRemoteContainer();
+ String pubId = sender.getName();
+ String queue = clientId + ":" + pubId;
+ result = sessionSPI.queueQuery(queue, false);
+ if (result.isExists()) {
+ if (result.getConsumerCount() > 0) {
+ System.out.println("error");
+ }
+ sessionSPI.deleteQueue(queue);
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ throw new ActiveMQAMQPInternalErrorException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
+ Object message = delivery.getContext();
+
+ boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+
+ DeliveryState remoteState = delivery.getRemoteState();
+
+ if (remoteState != null) {
+ // If we are transactional then we need ack if the msg has been accepted
+ if (remoteState instanceof TransactionalState) {
+
+ TransactionalState txState = (TransactionalState) remoteState;
+ Transaction tx = this.sessionSPI.getTransaction(txState.getTxnId());
+ if (txState.getOutcome() != null) {
+ Outcome outcome = txState.getOutcome();
+ if (outcome instanceof Accepted) {
+ if (!delivery.remotelySettled()) {
+ TransactionalState txAccepted = new TransactionalState();
+ txAccepted.setOutcome(Accepted.getInstance());
+ txAccepted.setTxnId(txState.getTxnId());
+
+ delivery.disposition(txAccepted);
+ }
+ //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
+ // from dealer, a perf hit but a must
+ try {
+ sessionSPI.ack(tx, brokerConsumer, message);
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+ }
+ }
+ }
+ }
+ else if (remoteState instanceof Accepted) {
+ //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
+ // from dealer, a perf hit but a must
+ try {
+ sessionSPI.ack(null, brokerConsumer, message);
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+ }
+ }
+ else if (remoteState instanceof Released) {
+ try {
+ sessionSPI.cancel(brokerConsumer, message, false);
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+ }
+ }
+ else if (remoteState instanceof Rejected || remoteState instanceof Modified) {
+ try {
+ sessionSPI.cancel(brokerConsumer, message, true);
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+ }
+ }
+ //todo add tag caching
+ if (!preSettle) {
+ protonSession.replaceTag(delivery.getTag());
+ }
+
+ synchronized (connection.getLock()) {
+ delivery.settle();
+ sender.offer(1);
+ }
+
+ }
+ else {
+ //todo not sure if we need to do anything here
+ }
+ }
+
+ public synchronized void checkState() {
+ sessionSPI.resumeDelivery(brokerConsumer);
+ }
+
+ /**
+ * handle an out going message from ActiveMQ Artemis, send via the Proton Sender
+ */
+ public int deliverMessage(Object message, int deliveryCount) throws Exception {
+ if (closed) {
+ System.err.println("Message can't be delivered as it's closed");
+ return 0;
+ }
+
+ //encode the message
+ ProtonJMessage serverMessage;
+ try {
+ // This can be done a lot better here
+ serverMessage = sessionSPI.encodeMessage(message, deliveryCount);
+ }
+ catch (Throwable e) {
+ log.warn(e.getMessage(), e);
+ throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+ }
+
+ return performSend(serverMessage, message);
+ }
+
+ private static boolean hasCapabilities(Symbol symbol, Source source) {
+ if (source != null) {
+ if (source.getCapabilities() != null) {
+ for (Symbol cap : source.getCapabilities()) {
+ if (symbol.equals(cap)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+ protected int performSend(ProtonJMessage serverMessage, Object context) {
+ if (!creditsSemaphore.tryAcquire()) {
+ try {
+ creditsSemaphore.acquire();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // nothing to be done here.. we just keep going
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ //presettle means we can ack the message on the dealer side before we send it, i.e. for browsers
+ boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+
+ //we only need a tag if we are going to ack later
+ byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
+
+ ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+ try {
+ serverMessage.encode(new NettyWritable(nettyBuffer));
+
+ int size = nettyBuffer.writerIndex();
+
+ synchronized (connection.getLock()) {
+ final Delivery delivery;
+ delivery = sender.delivery(tag, 0, tag.length);
+ delivery.setContext(context);
+
+ // this will avoid a copy.. patch provided by Norman using buffer.array()
+ sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
+
+ if (preSettle) {
+ delivery.settle();
+ }
+ else {
+ sender.advance();
+ }
+ }
+
+ connection.flush();
+
+ return size;
+ }
+ finally {
+ nettyBuffer.release();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
new file mode 100644
index 0000000..6d4e73a
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.jboss.logging.Logger;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+
+/**
+ * handles an amqp Coordinator to deal with transaction boundaries etc
+ */
+public class ProtonTransactionHandler implements ProtonDeliveryHandler {
+
+ private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
+
+ final AMQPSessionCallback sessionSPI;
+
+ public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
+ this.sessionSPI = sessionSPI;
+ }
+
+ @Override
+ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ final Receiver receiver;
+ try {
+ receiver = ((Receiver) delivery.getLink());
+
+ if (!delivery.isReadable()) {
+ return;
+ }
+
+ DeliveryUtil.readDelivery(receiver, buffer);
+
+ receiver.advance();
+
+ MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer);
+
+ Object action = ((AmqpValue) msg.getBody()).getValue();
+
+ if (action instanceof Declare) {
+ Binary txID = sessionSPI.newTransaction();
+ Declared declared = new Declared();
+ declared.setTxnId(txID);
+ delivery.disposition(declared);
+ delivery.settle();
+ }
+ else if (action instanceof Discharge) {
+ Discharge discharge = (Discharge) action;
+
+ Binary txID = discharge.getTxnId();
+ if (discharge.getFail()) {
+ try {
+ sessionSPI.rollbackTX(txID, true);
+ delivery.disposition(new Accepted());
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
+ }
+ }
+ else {
+ try {
+ sessionSPI.commitTX(txID);
+ delivery.disposition(new Accepted());
+ }
+ catch (ActiveMQAMQPException amqpE) {
+ throw amqpE;
+ }
+ catch (Exception e) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
+ }
+ }
+ }
+ }
+ catch (ActiveMQAMQPException amqpE) {
+ delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+ }
+ catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+ }
+ finally {
+ delivery.settle();
+ buffer.release();
+ }
+ }
+
+ private Rejected createRejected(Symbol amqpError, String message) {
+ Rejected rejected = new Rejected();
+ ErrorCondition condition = new ErrorCondition();
+ condition.setCondition(amqpError);
+ condition.setDescription(message);
+ rejected.setError(condition);
+ return rejected;
+ }
+
+ @Override
+ public void onFlow(int credits, boolean drain) {
+ }
+
+ @Override
+ public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
+ // no op
+ }
+
+ @Override
+ public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+ // no op
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
new file mode 100644
index 0000000..91c9a67
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
+
+/**
+ * EventHandler
+ */
+public interface EventHandler {
+
+ void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl);
+
+ void onInit(Connection connection) throws Exception;
+
+ void onLocalOpen(Connection connection) throws Exception;
+
+ void onRemoteOpen(Connection connection) throws Exception;
+
+ void onLocalClose(Connection connection) throws Exception;
+
+ void onRemoteClose(Connection connection) throws Exception;
+
+ void onFinal(Connection connection) throws Exception;
+
+ void onInit(Session session) throws Exception;
+
+ void onLocalOpen(Session session) throws Exception;
+
+ void onRemoteOpen(Session session) throws Exception;
+
+ void onLocalClose(Session session) throws Exception;
+
+ void onRemoteClose(Session session) throws Exception;
+
+ void onFinal(Session session) throws Exception;
+
+ void onInit(Link link) throws Exception;
+
+ void onLocalOpen(Link link) throws Exception;
+
+ void onRemoteOpen(Link link) throws Exception;
+
+ void onLocalClose(Link link) throws Exception;
+
+ void onRemoteClose(Link link) throws Exception;
+
+ void onFlow(Link link) throws Exception;
+
+ void onFinal(Link link) throws Exception;
+
+ void onRemoteDetach(Link link) throws Exception;
+
+ void onDetach(Link link) throws Exception;
+
+ void onDelivery(Delivery delivery) throws Exception;
+
+ void onTransport(Transport transport) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
new file mode 100644
index 0000000..6552f64
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Transport;
+
+public final class Events {
+
+ public static void dispatchTransport(Transport transport, EventHandler handler) throws Exception {
+ handler.onTransport(transport);
+ }
+
+ public static void dispatch(Event event, EventHandler handler) throws Exception {
+ switch (event.getType()) {
+ case CONNECTION_INIT:
+ handler.onInit(event.getConnection());
+ break;
+ case CONNECTION_LOCAL_OPEN:
+ handler.onLocalOpen(event.getConnection());
+ break;
+ case CONNECTION_REMOTE_OPEN:
+ handler.onRemoteOpen(event.getConnection());
+ break;
+ case CONNECTION_LOCAL_CLOSE:
+ handler.onLocalClose(event.getConnection());
+ break;
+ case CONNECTION_REMOTE_CLOSE:
+ handler.onRemoteClose(event.getConnection());
+ break;
+ case CONNECTION_FINAL:
+ handler.onFinal(event.getConnection());
+ break;
+ case SESSION_INIT:
+ handler.onInit(event.getSession());
+ break;
+ case SESSION_LOCAL_OPEN:
+ handler.onLocalOpen(event.getSession());
+ break;
+ case SESSION_REMOTE_OPEN:
+ handler.onRemoteOpen(event.getSession());
+ break;
+ case SESSION_LOCAL_CLOSE:
+ handler.onLocalClose(event.getSession());
+ break;
+ case SESSION_REMOTE_CLOSE:
+ handler.onRemoteClose(event.getSession());
+ break;
+ case SESSION_FINAL:
+ handler.onFinal(event.getSession());
+ break;
+ case LINK_INIT:
+ handler.onInit(event.getLink());
+ break;
+ case LINK_LOCAL_OPEN:
+ handler.onLocalOpen(event.getLink());
+ break;
+ case LINK_REMOTE_OPEN:
+ handler.onRemoteOpen(event.getLink());
+ break;
+ case LINK_LOCAL_CLOSE:
+ handler.onLocalClose(event.getLink());
+ break;
+ case LINK_REMOTE_CLOSE:
+ handler.onRemoteClose(event.getLink());
+ break;
+ case LINK_FLOW:
+ handler.onFlow(event.getLink());
+ break;
+ case LINK_FINAL:
+ handler.onFinal(event.getLink());
+ break;
+ case LINK_LOCAL_DETACH:
+ handler.onDetach(event.getLink());
+ break;
+ case LINK_REMOTE_DETACH:
+ handler.onRemoteDetach(event.getLink());
+ break;
+ case TRANSPORT:
+ handler.onTransport(event.getTransport());
+ break;
+ case DELIVERY:
+ handler.onDelivery(event.getDelivery());
+ break;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
new file mode 100644
index 0000000..b2a6230
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Connection;
+
+public class ExtCapability {
+
+ public static final Symbol[] capabilities = new Symbol[] {
+ AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY
+ };
+
+ public static Symbol[] getCapabilities() {
+ return capabilities;
+ }
+
+ public static boolean needUniqueConnection(Connection connection) {
+ Symbol[] extCapabilities = connection.getRemoteDesiredCapabilities();
+ if (extCapabilities != null) {
+ for (Symbol sym : extCapabilities) {
+ if (sym.compareTo(AmqpSupport.SOLE_CONNECTION_CAPABILITY) == 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
new file mode 100644
index 0000000..2efaa1b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
+import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.jboss.logging.Logger;
+
+public class ProtonHandler extends ProtonInitializable {
+
+ private static final Logger log = Logger.getLogger(ProtonHandler.class);
+
+ private static final byte SASL = 0x03;
+
+ private static final byte BARE = 0x00;
+
+ private final Transport transport = Proton.transport();
+
+ private final Connection connection = Proton.connection();
+
+ private final Collector collector = Proton.collector();
+
+ private final Executor dispatchExecutor;
+
+ private final Runnable dispatchRunnable = new Runnable() {
+ @Override
+ public void run() {
+ dispatch();
+ }
+ };
+
+ private ArrayList<EventHandler> handlers = new ArrayList<>();
+
+ private Sasl serverSasl;
+
+ private Sasl clientSasl;
+
+ private final Object lock = new Object();
+
+ private final long creationTime;
+
+ private Map<String, ServerSASL> saslHandlers;
+
+ private SASLResult saslResult;
+
+ protected volatile boolean dataReceived;
+
+ protected boolean receivedFirstPacket = false;
+
+ private int offset = 0;
+
+ public ProtonHandler(Executor dispatchExecutor) {
+ this.dispatchExecutor = dispatchExecutor;
+ this.creationTime = System.currentTimeMillis();
+ transport.bind(connection);
+ connection.collect(collector);
+ }
+
+ public long tick(boolean firstTick) {
+ if (!firstTick) {
+ try {
+ if (connection.getLocalState() != EndpointState.CLOSED) {
+ long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+ if (transport.isClosed()) {
+ throw new IllegalStateException("Channel was inactive for to long");
+ }
+ return rescheduleAt;
+ }
+ }
+ catch (Exception e) {
+ transport.close();
+ connection.setCondition(new ErrorCondition());
+ }
+ return 0;
+ }
+ return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+ }
+
+ public int capacity() {
+ synchronized (lock) {
+ return transport.capacity();
+ }
+ }
+
+ public Object getLock() {
+ return lock;
+ }
+
+ public Transport getTransport() {
+ return transport;
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public ProtonHandler addEventHandler(EventHandler handler) {
+ handlers.add(handler);
+ return this;
+ }
+
+ public void createServerSASL(ServerSASL[] handlers) {
+ this.serverSasl = transport.sasl();
+ saslHandlers = new HashMap<>();
+ String[] names = new String[handlers.length];
+ int count = 0;
+ for (ServerSASL handler : handlers) {
+ saslHandlers.put(handler.getName(), handler);
+ names[count++] = handler.getName();
+ }
+ this.serverSasl.server();
+ serverSasl.setMechanisms(names);
+
+ }
+
+ public SASLResult getSASLResult() {
+ return saslResult;
+ }
+
+ public void inputBuffer(ByteBuf buffer) {
+ dataReceived = true;
+ synchronized (lock) {
+ while (buffer.readableBytes() > 0) {
+ int capacity = transport.capacity();
+
+ if (!receivedFirstPacket) {
+ try {
+ byte auth = buffer.getByte(4);
+ if (auth == SASL || auth == BARE) {
+ dispatchAuth(auth == SASL);
+ /*
+ * there is a chance that if SASL Handshake has been carried out that the capacity may change.
+ * */
+ capacity = transport.capacity();
+ }
+ }
+ catch (Throwable e) {
+ log.debug(e.getMessage(), e);
+ }
+
+ receivedFirstPacket = true;
+ }
+
+ if (capacity > 0) {
+ ByteBuffer tail = transport.tail();
+ int min = Math.min(capacity, buffer.readableBytes());
+ tail.limit(min);
+ buffer.readBytes(tail);
+
+ flush();
+ }
+ else {
+ if (capacity == 0) {
+ log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
+ }
+ else {
+ log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ public boolean checkDataReceived() {
+ boolean res = dataReceived;
+
+ dataReceived = false;
+
+ return res;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public void outputDone(int bytes) {
+ synchronized (lock) {
+ transport.pop(bytes);
+ offset -= bytes;
+
+ if (offset < 0) {
+ throw new IllegalStateException("You called outputDone for more bytes than you actually received. numberOfBytes=" + bytes +
+ ", outcome result=" + offset);
+ }
+ }
+
+ flush();
+ }
+
+ public ByteBuf outputBuffer() {
+
+ synchronized (lock) {
+ int pending = transport.pending();
+
+ if (pending < 0) {
+ return null;//throw new IllegalStateException("xxx need to close the connection");
+ }
+
+ int size = pending - offset;
+
+ if (size < 0) {
+ throw new IllegalStateException("negative size: " + pending);
+ }
+
+ if (size == 0) {
+ return null;
+ }
+
+ // For returning PooledBytes
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size);
+ ByteBuffer head = transport.head();
+ head.position(offset);
+ head.limit(offset + size);
+ buffer.writeBytes(head);
+ offset += size; // incrementing offset for future calls
+ return buffer;
+ }
+ }
+
+ public void flush() {
+ synchronized (lock) {
+ transport.process();
+
+ checkServerSASL();
+
+ }
+
+ dispatchExecutor.execute(dispatchRunnable);
+ }
+
+ public void close() {
+ synchronized (lock) {
+ connection.close();
+ }
+ flush();
+ }
+
+ protected void checkServerSASL() {
+ if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0) {
+ // TODO: should we look at the first only?
+ ServerSASL mechanism = saslHandlers.get(serverSasl.getRemoteMechanisms()[0]);
+ if (mechanism != null) {
+
+ byte[] dataSASL = new byte[serverSasl.pending()];
+ serverSasl.recv(dataSASL, 0, dataSASL.length);
+
+ if (log.isTraceEnabled()) {
+ log.trace("Working on sasl::" + ByteUtil.bytesToHex(dataSASL, 2));
+ }
+
+ saslResult = mechanism.processSASL(dataSASL);
+
+ if (saslResult != null && saslResult.isSuccess()) {
+ serverSasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+ serverSasl = null;
+ saslHandlers.clear();
+ saslHandlers = null;
+ }
+ else {
+ serverSasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
+ }
+ serverSasl = null;
+ }
+ else {
+ // no auth available, system error
+ serverSasl.done(Sasl.SaslOutcome.PN_SASL_SYS);
+ }
+ }
+ }
+
+ private Event popEvent() {
+ synchronized (lock) {
+ Event ev = collector.peek();
+ if (ev != null) {
+ // pop will invalidate the event
+ // for that reason we make a new one
+ // Events are reused inside the collector, so we need to make a new one here
+ ev = ev.copy();
+ collector.pop();
+ }
+ return ev;
+ }
+ }
+
+ private void dispatchAuth(boolean sasl) {
+ for (EventHandler h : handlers) {
+ h.onAuthInit(this, getConnection(), sasl);
+ }
+ }
+
+ private void dispatch() {
+ Event ev;
+ // We don't hold a lock on the entire event processing
+ // because we could have a distributed deadlock
+ // while processing events (for instance onTransport)
+ // while a client is also trying to write here
+ while ((ev = popEvent()) != null) {
+ for ( EventHandler h : handlers) {
+ if (log.isTraceEnabled()) {
+ log.trace("Handling " + ev + " towards " + h);
+ }
+ try {
+ Events.dispatch(ev, h);
+ }
+ catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ connection.setCondition(new ErrorCondition());
+ }
+ }
+ }
+
+ for (EventHandler h : handlers) {
+ try {
+ h.onTransport(transport);
+ }
+ catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ connection.setCondition(new ErrorCondition());
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/package-info.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/package-info.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/package-info.java
new file mode 100644
index 0000000..8476f5b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package includes classes used on the interaction with Proton, including Context classes that will be translated
+ * through the model event.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/AnonymousServerSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/AnonymousServerSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/AnonymousServerSASL.java
new file mode 100644
index 0000000..013b73b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/AnonymousServerSASL.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public class AnonymousServerSASL implements ServerSASL {
+
+ public AnonymousServerSASL() {
+ }
+
+ @Override
+ public String getName() {
+ return "ANONYMOUS";
+ }
+
+ @Override
+ public SASLResult processSASL(byte[] bytes) {
+ return new PlainSASLResult(true, null, null);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java
new file mode 100644
index 0000000..cb82eba
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+import org.apache.activemq.artemis.core.security.SecurityStore;
+
+public class PlainSASL extends ServerSASLPlain {
+
+ private final SecurityStore securityStore;
+
+ public PlainSASL(SecurityStore securityStore) {
+ this.securityStore = securityStore;
+ }
+
+ @Override
+ protected boolean authenticate(String user, String password) {
+ if (securityStore.isSecurityEnabled()) {
+ try {
+ securityStore.authenticate(user, password, null);
+ return true;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }
+ else {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASLResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASLResult.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASLResult.java
new file mode 100644
index 0000000..f138ae3
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASLResult.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public class PlainSASLResult implements SASLResult {
+
+ private boolean success;
+ private String user;
+ private String password;
+
+ public PlainSASLResult(boolean success, String user, String password) {
+ this.success = success;
+ this.user = user;
+ this.password = password;
+ }
+
+ @Override
+ public String getUser() {
+ return user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ @Override
+ public boolean isSuccess() {
+ return success;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/SASLResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/SASLResult.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/SASLResult.java
new file mode 100644
index 0000000..f8c4297
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/SASLResult.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public interface SASLResult {
+
+ String getUser();
+
+ boolean isSuccess();
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASL.java
new file mode 100644
index 0000000..43d57d0
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASL.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public interface ServerSASL {
+
+ String getName();
+
+ SASLResult processSASL(byte[] bytes);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASLPlain.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASLPlain.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASLPlain.java
new file mode 100644
index 0000000..da26d2e
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASLPlain.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public class ServerSASLPlain implements ServerSASL {
+
+ public static final String NAME = "PLAIN";
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public SASLResult processSASL(byte[] data) {
+
+ String username = null;
+ String password = null;
+ String bytes = new String(data);
+ String[] credentials = bytes.split(Character.toString((char) 0));
+ int offSet = 0;
+ if (credentials.length > 0) {
+ if (credentials[0].length() == 0) {
+ offSet = 1;
+ }
+
+ if (credentials.length >= offSet) {
+ username = credentials[offSet];
+ }
+ if (credentials.length >= (offSet + 1)) {
+ password = credentials[offSet + 1];
+ }
+ }
+
+ boolean success = authenticate(username, password);
+
+ return new PlainSASLResult(success, username, password);
+ }
+
+ /**
+ * Hook for subclasses to perform the authentication here
+ *
+ * @param user
+ * @param password
+ */
+ protected boolean authenticate(String user, String password) {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CodecCache.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CodecCache.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CodecCache.java
new file mode 100644
index 0000000..53d0bc1
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CodecCache.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+
+public class CodecCache {
+
+ private static class EncoderDecoderPair {
+
+ DecoderImpl decoder = new DecoderImpl();
+ EncoderImpl encoder = new EncoderImpl(decoder);
+
+ {
+ AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+ }
+ }
+
+ private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
+ @Override
+ protected EncoderDecoderPair initialValue() {
+ return new EncoderDecoderPair();
+ }
+ };
+
+ public static DecoderImpl getDecoder() {
+ return tlsCodec.get().decoder;
+ }
+
+ public static EncoderImpl getEncoder() {
+ return tlsCodec.get().encoder;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java
new file mode 100644
index 0000000..3eda199
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+public class CreditsSemaphore {
+
+ @SuppressWarnings("serial")
+ private static class Sync extends AbstractQueuedSynchronizer {
+
+ private Sync(int initial) {
+ setState(initial);
+ }
+
+ public int getCredits() {
+ return getState();
+ }
+
+ @Override
+ public int tryAcquireShared(final int numberOfAqcquires) {
+ for (;;) {
+ int actualSize = getState();
+ int newValue = actualSize - numberOfAqcquires;
+
+ if (newValue < 0) {
+ if (actualSize == getState()) {
+ return -1;
+ }
+ }
+ else if (compareAndSetState(actualSize, newValue)) {
+ return newValue;
+ }
+ }
+ }
+
+ @Override
+ public boolean tryReleaseShared(final int numberOfReleases) {
+ for (;;) {
+ int actualSize = getState();
+ int newValue = actualSize + numberOfReleases;
+
+ if (compareAndSetState(actualSize, newValue)) {
+ return true;
+ }
+
+ }
+ }
+
+ public void setCredits(final int credits) {
+ for (;;) {
+ int actualState = getState();
+ if (compareAndSetState(actualState, credits)) {
+ // This is to wake up any pending threads that could be waiting on queued
+ releaseShared(0);
+ return;
+ }
+ }
+ }
+ }
+
+ private final Sync sync;
+
+ public CreditsSemaphore(int initialCredits) {
+ sync = new Sync(initialCredits);
+ }
+
+ public void acquire() throws InterruptedException {
+ sync.acquireSharedInterruptibly(1);
+ }
+
+ public boolean tryAcquire() {
+ return sync.tryAcquireShared(1) >= 0;
+ }
+
+ public void release() throws InterruptedException {
+ sync.releaseShared(1);
+ }
+
+ public void release(int credits) throws InterruptedException {
+ sync.releaseShared(credits);
+ }
+
+ public void setCredits(int credits) {
+ sync.setCredits(credits);
+ }
+
+ public int getCredits() {
+ return sync.getCredits();
+ }
+
+ public boolean hasQueuedThreads() {
+ return sync.hasQueuedThreads();
+ }
+
+}
\ No newline at end of file