You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/27 13:54:39 UTC
[12/15] activemq-artemis git commit: ARTEMIS-751 Simplification of
the AMQP implementation
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
deleted file mode 100644
index d5b2ff7..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.proton.plug;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
-import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
-import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
-import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
-import org.apache.activemq.artemis.core.remoting.FailureListener;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.utils.ReusableLatch;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.SASLResult;
-import org.proton.plug.ServerSASL;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.handler.ExtCapability;
-import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
-import org.proton.plug.sasl.AnonymousServerSASL;
-
-import static org.proton.plug.AmqpSupport.CONTAINER_ID;
-import static org.proton.plug.AmqpSupport.INVALID_FIELD;
-import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
-
-public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, FailureListener, CloseListener {
- private static final Logger logger = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
- private static final List<String> connectedContainers = Collections.synchronizedList(new ArrayList());
-
- private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
-
- private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
-
- private final ProtonProtocolManager manager;
-
- private final Connection connection;
-
- protected ActiveMQProtonRemotingConnection protonConnectionDelegate;
-
- protected AMQPConnectionContext amqpConnection;
-
- private final ReusableLatch latch = new ReusableLatch(0);
-
- private final Executor closeExecutor;
-
- private String remoteContainerId;
-
- private AtomicBoolean registeredConnectionId = new AtomicBoolean(false);
-
- private ActiveMQServer server;
-
- public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
- Connection connection,
- Executor closeExecutor,
- ActiveMQServer server) {
- this.manager = manager;
- this.connection = connection;
- this.closeExecutor = closeExecutor;
- this.server = server;
- }
-
- @Override
- public ServerSASL[] getSASLMechnisms() {
-
- ServerSASL[] result;
-
- if (isSupportsAnonymous()) {
- result = new ServerSASL[]{new ActiveMQPlainSASL(manager.getServer().getSecurityStore()), new AnonymousServerSASL()};
- }
- else {
- result = new ServerSASL[]{new ActiveMQPlainSASL(manager.getServer().getSecurityStore())};
- }
-
- return result;
- }
-
- @Override
- public boolean isSupportsAnonymous() {
- boolean supportsAnonymous = false;
- try {
- manager.getServer().getSecurityStore().authenticate(null, null, null);
- supportsAnonymous = true;
- }
- catch (Exception e) {
- // authentication failed so no anonymous support
- }
- return supportsAnonymous;
- }
-
- @Override
- public void close() {
- try {
- if (registeredConnectionId.getAndSet(false)) {
- server.removeClientConnection(remoteContainerId);
- }
- connection.close();
- amqpConnection.close();
- }
- finally {
- for (Transaction tx : transactions.values()) {
- try {
- tx.rollback();
- }
- catch (Exception e) {
- logger.warn(e.getMessage(), e);
- }
- }
- }
- }
-
- public Executor getExeuctor() {
- if (protonConnectionDelegate != null) {
- return protonConnectionDelegate.getExecutor();
- }
- else {
- return null;
- }
- }
-
- @Override
- public void setConnection(AMQPConnectionContext connection) {
- this.amqpConnection = connection;
- }
-
- @Override
- public AMQPConnectionContext getConnection() {
- return amqpConnection;
- }
-
- public ActiveMQProtonRemotingConnection getProtonConnectionDelegate() {
- return protonConnectionDelegate;
- }
-
- public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) {
-
- this.protonConnectionDelegate = protonConnectionDelegate;
- }
-
- @Override
- public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) {
- final int size = byteBuf.writerIndex();
-
- latch.countUp();
- connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- latch.countDown();
- }
- });
-
- if (amqpConnection.isSyncOnFlush()) {
- try {
- latch.await(5, TimeUnit.SECONDS);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- amqpConnection.outputDone(size);
- }
-
- @Override
- public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
- return new ProtonSessionIntegrationCallback(this, manager, connection, this.connection, closeExecutor);
- }
-
- @Override
- public void sendSASLSupported() {
- connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1, 0, 0}));
- }
-
- @Override
- public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) {
- remoteContainerId = connection.getRemoteContainer();
- boolean idOK = server.addClientConnection(remoteContainerId, ExtCapability.needUniqueConnection(connection));
- if (!idOK) {
- //https://issues.apache.org/jira/browse/ARTEMIS-728
- Map<Symbol, Object> connProp = new HashMap<>();
- connProp.put(CONNECTION_OPEN_FAILED, "true");
- connection.setProperties(connProp);
- connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
- Map<Symbol, Symbol> info = new HashMap<>();
- info.put(INVALID_FIELD, CONTAINER_ID);
- connection.getCondition().setInfo(info);
- return false;
- }
- registeredConnectionId.set(true);
- return true;
- }
-
- @Override
- public void connectionClosed() {
- close();
- }
-
- @Override
- public void connectionFailed(ActiveMQException exception, boolean failedOver) {
- close();
- }
-
- @Override
- public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
- close();
- }
-
- @Override
- public Binary newTransaction() {
- XidImpl xid = newXID();
- Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1);
- transactions.put(xid, transaction);
- return new Binary(xid.getGlobalTransactionId());
- }
-
- @Override
- public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
- XidImpl xid = newXID(txid.getArray());
- Transaction tx = transactions.get(xid);
-
- if (tx == null) {
- throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
- }
-
- return tx;
- }
-
- @Override
- public void removeTransaction(Binary txid) {
- XidImpl xid = newXID(txid.getArray());
- transactions.remove(xid);
- }
-
-
- protected XidImpl newXID() {
- return newXID(UUIDGenerator.getInstance().generateStringUUID().getBytes());
- }
-
- protected XidImpl newXID(byte[] bytes) {
- return new XidImpl("amqp".getBytes(), 1, bytes);
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
deleted file mode 100644
index da9dd9c..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.proton.plug;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
-import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
-import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SelectorTranslator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.AMQPSessionContext;
-import org.proton.plug.SASLResult;
-import org.proton.plug.context.ProtonPlugSender;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
-import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
-import org.proton.plug.sasl.PlainSASLResult;
-
-public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback {
-
- protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
-
- private final ActiveMQProtonConnectionCallback protonSPI;
-
- private final ProtonProtocolManager manager;
-
- private final AMQPConnectionContext connection;
-
- private final Connection transportConnection;
-
- private ServerSession serverSession;
-
- private AMQPSessionContext protonSession;
-
- private final Executor closeExecutor;
-
- private final AtomicBoolean draining = new AtomicBoolean(false);
-
- public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI,
- ProtonProtocolManager manager,
- AMQPConnectionContext connection,
- Connection transportConnection,
- Executor executor) {
- this.protonSPI = protonSPI;
- this.manager = manager;
- this.connection = connection;
- this.transportConnection = transportConnection;
- this.closeExecutor = executor;
- }
-
- @Override
- public boolean isWritable(ReadyListener callback) {
- return transportConnection.isWritable(callback);
- }
-
- @Override
- public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
- ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
- if (drain) {
- // If the draining is already running, then don't do anything
- if (draining.compareAndSet(false, true)) {
- final ProtonPlugSender plugSender = (ProtonPlugSender) serverConsumer.getProtocolContext();
- serverConsumer.forceDelivery(1, new Runnable() {
- @Override
- public void run() {
- try {
- plugSender.getSender().drained();
- }
- finally {
- draining.set(false);
- }
- }
- });
- }
- }
- else {
- serverConsumer.receiveCredits(-1);
- }
- }
-
- @Override
- public void browserFinished(ServerConsumer consumer) {
-
- }
-
- @Override
- public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
-
- this.protonSession = protonSession;
-
- String name = UUIDGenerator.getInstance().generateStringUUID();
-
- String user = null;
- String passcode = null;
- if (saslResult != null) {
- user = saslResult.getUser();
- if (saslResult instanceof PlainSASLResult) {
- passcode = ((PlainSASLResult) saslResult).getPassword();
- }
- }
-
- serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
- false, // boolean autoCommitSends
- false, // boolean autoCommitAcks,
- false, // boolean preAcknowledge,
- true, //boolean xa,
- (String) null, this, true);
- }
-
- @Override
- public void afterDelivery() throws Exception {
-
- }
-
- @Override
- public void start() {
-
- }
-
- @Override
- public Object createSender(ProtonPlugSender protonSender,
- String queue,
- String filter,
- boolean browserOnly) throws Exception {
- long consumerID = consumerIDGenerator.generateID();
-
- filter = SelectorTranslator.convertToActiveMQFilterString(filter);
-
- ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly);
-
- // AMQP handles its own flow control for when it's started
- consumer.setStarted(true);
-
- consumer.setProtocolContext(protonSender);
-
- return consumer;
- }
-
- @Override
- public void startSender(Object brokerConsumer) throws Exception {
- ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer;
- // flow control is done at proton
- serverConsumer.receiveCredits(-1);
- }
-
- @Override
- public void createTemporaryQueue(String queueName) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
- }
-
- @Override
- public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
- }
-
- @Override
- public void createDurableQueue(String address, String queueName, String filter) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
- }
-
- @Override
- public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception {
- QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
-
- if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateJmsQueues() && autoCreate) {
- try {
- serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
- }
- catch (ActiveMQQueueExistsException e) {
- // The queue may have been created by another thread in the mean time. Catch and do nothing.
- }
- queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateJmsQueues(), true);
- }
- return queueQueryResult;
- }
-
- @Override
- public boolean bindingQuery(String address) throws Exception {
- BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
- if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateJmsQueues()) {
- try {
- serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
- }
- catch (ActiveMQQueueExistsException e) {
- // The queue may have been created by another thread in the mean time. Catch and do nothing.
- }
- bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
- }
- return bindingQueryResult.isExists();
- }
-
- @Override
- public void closeSender(final Object brokerConsumer) throws Exception {
-
- final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
- final CountDownLatch latch = new CountDownLatch(1);
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- consumer.close(false);
- latch.countDown();
- }
- catch (Exception e) {
- }
- }
- };
-
- // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
- // to avoid deadlocks the close has to be done outside of the main thread on an executor
- // otherwise you could get a deadlock
- Executor executor = protonSPI.getExeuctor();
-
- if (executor != null) {
- executor.execute(runnable);
- }
- else {
- runnable.run();
- }
-
- try {
- latch.await(10, TimeUnit.SECONDS);
- }
- catch (InterruptedException e) {
- throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
- }
- }
-
- @Override
- public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception {
- return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount);
- }
-
- @Override
- public String tempQueueName() {
- return UUIDGenerator.getInstance().generateStringUUID();
- }
-
- @Override
- public void close() throws Exception {
- //need to check here as this can be called if init fails
- if (serverSession != null) {
- recoverContext();
- try {
- serverSession.close(false);
- }
- finally {
- resetContext();
- }
- }
- }
-
- @Override
- public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception {
- if (transaction == null) {
- transaction = serverSession.getCurrentTransaction();
- }
- recoverContext();
- try {
- ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID());
- }
- finally {
- resetContext();
- }
- }
-
- @Override
- public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception {
- recoverContext();
- try {
- ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts);
- }
- finally {
- resetContext();
- }
- }
-
- @Override
- public void resumeDelivery(Object consumer) {
- ((ServerConsumer) consumer).receiveCredits(-1);
- }
-
- @Override
- public void serverSend(final Transaction transaction,
- final Receiver receiver,
- final Delivery delivery,
- String address,
- int messageFormat,
- ByteBuf messageEncoded) throws Exception {
- EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
-
- ServerMessage message = manager.getConverter().inbound(encodedMessage);
- //use the address on the receiver if not null, if null let's hope it was set correctly on the message
- if (address != null) {
- message.setAddress(new SimpleString(address));
- }
-
- recoverContext();
-
- PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
- if (store.isRejectingMessages()) {
- // We drop pre-settled messages (and abort any associated Tx)
- if (delivery.remotelySettled()) {
- if (transaction != null) {
- String amqpAddress = delivery.getLink().getTarget().getAddress();
- ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
- transaction.markAsRollbackOnly(e);
- }
- }
- else {
- rejectMessage(delivery);
- }
- }
- else {
- serverSend(transaction, message, delivery, receiver);
- }
- }
-
- private void rejectMessage(Delivery delivery) {
- String address = delivery.getLink().getTarget().getAddress();
- ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
- Rejected rejected = new Rejected();
- rejected.setError(ec);
- delivery.disposition(rejected);
- connection.flush();
- }
-
- private void serverSend(final Transaction transaction, final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
- try {
-
- message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
- serverSession.send(transaction, message, false, false);
-
- // FIXME Potential race here...
- manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
- @Override
- public void done() {
- synchronized (connection.getLock()) {
- delivery.disposition(Accepted.getInstance());
- delivery.settle();
- connection.flush();
- }
- }
-
- @Override
- public void onError(int errorCode, String errorMessage) {
- synchronized (connection.getLock()) {
- receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
- connection.flush();
- }
- }
- });
- }
- finally {
- resetContext();
- }
- }
-
- @Override
- public String getPubSubPrefix() {
- return manager.getPubSubPrefix();
- }
-
- @Override
- public void offerProducerCredit(final String address, final int credits, final int threshold, final Receiver receiver) {
- try {
- final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
- store.checkMemory(new Runnable() {
- @Override
- public void run() {
- if (receiver.getRemoteCredit() < threshold) {
- receiver.flow(credits);
- connection.flush();
- }
- }
- });
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void deleteQueue(String queueName) throws Exception {
- manager.getServer().destroyQueue(new SimpleString(queueName));
- }
-
- private void resetContext() {
- manager.getServer().getStorageManager().setContext(null);
- }
-
- private void recoverContext() {
- manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
- }
-
- @Override
- public void sendProducerCreditsMessage(int credits, SimpleString address) {
- }
-
- @Override
- public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
- return false;
- }
-
- @Override
- public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
- }
-
- @Override
- public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
-
- message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
-
- ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
-
- try {
- return plugSender.deliverMessage(message, deliveryCount);
- }
- catch (Exception e) {
- synchronized (connection.getLock()) {
- plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
- connection.flush();
- }
- throw new IllegalStateException("Can't deliver message " + e, e);
- }
-
- }
-
- @Override
- public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
- return 0;
- }
-
- @Override
- public int sendLargeMessageContinuation(ServerConsumer consumer,
- byte[] body,
- boolean continues,
- boolean requiresResponse) {
- return 0;
- }
-
- @Override
- public void closed() {
- }
-
- @Override
- public void disconnect(ServerConsumer consumer, String queueName) {
- synchronized (connection.getLock()) {
- ((Link) consumer.getProtocolContext()).close();
- connection.flush();
- }
- }
-
- @Override
- public boolean hasCredits(ServerConsumer consumer) {
- ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
-
- if (plugSender != null && plugSender.getSender().getCredit() > 0) {
- return true;
- }
- else {
- return false;
- }
- }
-
- @Override
- public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
- return protonSPI.getTransaction(txid);
- }
-
- @Override
- public Binary newTransaction() {
- return protonSPI.newTransaction();
- }
-
-
- @Override
- public void commitTX(Binary txid) throws Exception {
- Transaction tx = protonSPI.getTransaction(txid);
- tx.commit(true);
- protonSPI.removeTransaction(txid);
- }
-
- @Override
- public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
- Transaction tx = protonSPI.getTransaction(txid);
- tx.rollback();
- protonSPI.removeTransaction(txid);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java
deleted file mode 100644
index bf4f043..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.proton.sasl;
-
-import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.proton.plug.sasl.ServerSASLPlain;
-
-public class ActiveMQPlainSASL extends ServerSASLPlain {
-
- private final SecurityStore securityStore;
-
- public ActiveMQPlainSASL(SecurityStore securityStore) {
- this.securityStore = securityStore;
- }
-
- @Override
- protected boolean authenticate(String user, String password) {
- if (securityStore.isSecurityEnabled()) {
- try {
- securityStore.authenticate(user, password, null);
- return true;
- }
- catch (Exception e) {
- return false;
- }
- }
- else {
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
new file mode 100644
index 0000000..fd79547
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
+import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.jboss.logging.Logger;
+import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
+import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+
+public class AMQPConnectionCallback implements FailureListener, CloseListener {
+ private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class);
+
+ private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
+
+ private final ProtonProtocolManager manager;
+
+ private final Connection connection;
+
+ protected ActiveMQProtonRemotingConnection protonConnectionDelegate;
+
+ protected AMQPConnectionContext amqpConnection;
+
+ private final ReusableLatch latch = new ReusableLatch(0);
+
+ private final Executor closeExecutor;
+
+ private String remoteContainerId;
+
+ private AtomicBoolean registeredConnectionId = new AtomicBoolean(false);
+
+ private ActiveMQServer server;
+
+ public AMQPConnectionCallback(ProtonProtocolManager manager,
+ Connection connection,
+ Executor closeExecutor,
+ ActiveMQServer server) {
+ this.manager = manager;
+ this.connection = connection;
+ this.closeExecutor = closeExecutor;
+ this.server = server;
+ }
+
+ public ServerSASL[] getSASLMechnisms() {
+
+ ServerSASL[] result;
+
+ if (isSupportsAnonymous()) {
+ result = new ServerSASL[]{new PlainSASL(manager.getServer().getSecurityStore()), new AnonymousServerSASL()};
+ }
+ else {
+ result = new ServerSASL[]{new PlainSASL(manager.getServer().getSecurityStore())};
+ }
+
+ return result;
+ }
+
+ public boolean isSupportsAnonymous() {
+ boolean supportsAnonymous = false;
+ try {
+ manager.getServer().getSecurityStore().authenticate(null, null, null);
+ supportsAnonymous = true;
+ }
+ catch (Exception e) {
+ // authentication failed so no anonymous support
+ }
+ return supportsAnonymous;
+ }
+
+ public void close() {
+ try {
+ if (registeredConnectionId.getAndSet(false)) {
+ server.removeClientConnection(remoteContainerId);
+ }
+ connection.close();
+ amqpConnection.close();
+ }
+ finally {
+ for (Transaction tx : transactions.values()) {
+ try {
+ tx.rollback();
+ }
+ catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ public Executor getExeuctor() {
+ if (protonConnectionDelegate != null) {
+ return protonConnectionDelegate.getExecutor();
+ }
+ else {
+ return null;
+ }
+ }
+
+ public void setConnection(AMQPConnectionContext connection) {
+ this.amqpConnection = connection;
+ }
+
+ public AMQPConnectionContext getConnection() {
+ return amqpConnection;
+ }
+
+ public ActiveMQProtonRemotingConnection getProtonConnectionDelegate() {
+ return protonConnectionDelegate;
+ }
+
+ public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) {
+
+ this.protonConnectionDelegate = protonConnectionDelegate;
+ }
+
+ public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) {
+ final int size = byteBuf.writerIndex();
+
+ latch.countUp();
+ connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ latch.countDown();
+ }
+ });
+
+ if (amqpConnection.isSyncOnFlush()) {
+ try {
+ latch.await(5, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ amqpConnection.outputDone(size);
+ }
+
+ public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
+ return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor);
+ }
+
+ public void sendSASLSupported() {
+ connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1, 0, 0}));
+ }
+
+ public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) {
+ remoteContainerId = connection.getRemoteContainer();
+ boolean idOK = server.addClientConnection(remoteContainerId, ExtCapability.needUniqueConnection(connection));
+ if (!idOK) {
+ //https://issues.apache.org/jira/browse/ARTEMIS-728
+ Map<Symbol, Object> connProp = new HashMap<>();
+ connProp.put(AmqpSupport.CONNECTION_OPEN_FAILED, "true");
+ connection.setProperties(connProp);
+ connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
+ Map<Symbol, Symbol> info = new HashMap<>();
+ info.put(AmqpSupport.INVALID_FIELD, AmqpSupport.CONTAINER_ID);
+ connection.getCondition().setInfo(info);
+ return false;
+ }
+ registeredConnectionId.set(true);
+ return true;
+ }
+
+ @Override
+ public void connectionClosed() {
+ close();
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+ close();
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+ close();
+ }
+
+ public Binary newTransaction() {
+ XidImpl xid = newXID();
+ Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1);
+ transactions.put(xid, transaction);
+ return new Binary(xid.getGlobalTransactionId());
+ }
+
+ public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+ XidImpl xid = newXID(txid.getArray());
+ Transaction tx = transactions.get(xid);
+
+ if (tx == null) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
+ }
+
+ return tx;
+ }
+
+ public void removeTransaction(Binary txid) {
+ XidImpl xid = newXID(txid.getArray());
+ transactions.remove(xid);
+ }
+
+
+ protected XidImpl newXID() {
+ return newXID(UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ }
+
+ protected XidImpl newXID(byte[] bytes) {
+ return new XidImpl("amqp".getBytes(), 1, bytes);
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
new file mode 100644
index 0000000..9bdf4e1
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.SelectorTranslator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
+
+public class AMQPSessionCallback implements SessionCallback {
+
+ protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
+
+ private final AMQPConnectionCallback protonSPI;
+
+ private final ProtonProtocolManager manager;
+
+ private final AMQPConnectionContext connection;
+
+ private final Connection transportConnection;
+
+ private ServerSession serverSession;
+
+ private AMQPSessionContext protonSession;
+
+ private final Executor closeExecutor;
+
+ private final AtomicBoolean draining = new AtomicBoolean(false);
+
+ public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
+ ProtonProtocolManager manager,
+ AMQPConnectionContext connection,
+ Connection transportConnection,
+ Executor executor) {
+ this.protonSPI = protonSPI;
+ this.manager = manager;
+ this.connection = connection;
+ this.transportConnection = transportConnection;
+ this.closeExecutor = executor;
+ }
+
+ @Override
+ public boolean isWritable(ReadyListener callback) {
+ return transportConnection.isWritable(callback);
+ }
+
+ public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
+ ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
+ if (drain) {
+ // If the draining is already running, then don't do anything
+ if (draining.compareAndSet(false, true)) {
+ final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
+ serverConsumer.forceDelivery(1, new Runnable() {
+ @Override
+ public void run() {
+ try {
+ plugSender.getSender().drained();
+ }
+ finally {
+ draining.set(false);
+ }
+ }
+ });
+ }
+ }
+ else {
+ serverConsumer.receiveCredits(-1);
+ }
+ }
+
+ @Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
+
+ public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
+
+ this.protonSession = protonSession;
+
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+
+ String user = null;
+ String passcode = null;
+ if (saslResult != null) {
+ user = saslResult.getUser();
+ if (saslResult instanceof PlainSASLResult) {
+ passcode = ((PlainSASLResult) saslResult).getPassword();
+ }
+ }
+
+ serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
+ false, // boolean autoCommitSends
+ false, // boolean autoCommitAcks,
+ false, // boolean preAcknowledge,
+ true, //boolean xa,
+ (String) null, this, true);
+ }
+
+ @Override
+ public void afterDelivery() throws Exception {
+
+ }
+
+ public void start() {
+
+ }
+
+ public Object createSender(ProtonServerSenderContext protonSender,
+ String queue,
+ String filter,
+ boolean browserOnly) throws Exception {
+ long consumerID = consumerIDGenerator.generateID();
+
+ filter = SelectorTranslator.convertToActiveMQFilterString(filter);
+
+ ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly);
+
+ // AMQP handles its own flow control for when it's started
+ consumer.setStarted(true);
+
+ consumer.setProtocolContext(protonSender);
+
+ return consumer;
+ }
+
+ public void startSender(Object brokerConsumer) throws Exception {
+ ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer;
+ // flow control is done at proton
+ serverConsumer.receiveCredits(-1);
+ }
+
+ public void createTemporaryQueue(String queueName) throws Exception {
+ serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
+ }
+
+ public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
+ serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
+ }
+
+ public void createDurableQueue(String address, String queueName, String filter) throws Exception {
+ serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
+ }
+
+ public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception {
+ QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+
+ if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateJmsQueues() && autoCreate) {
+ try {
+ serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
+ }
+ catch (ActiveMQQueueExistsException e) {
+ // The queue may have been created by another thread in the mean time. Catch and do nothing.
+ }
+ queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateJmsQueues(), true);
+ }
+ return queueQueryResult;
+ }
+
+ public boolean bindingQuery(String address) throws Exception {
+ BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
+ if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateJmsQueues()) {
+ try {
+ serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
+ }
+ catch (ActiveMQQueueExistsException e) {
+ // The queue may have been created by another thread in the mean time. Catch and do nothing.
+ }
+ bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
+ }
+ return bindingQueryResult.isExists();
+ }
+
+ public void closeSender(final Object brokerConsumer) throws Exception {
+
+ final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumer.close(false);
+ latch.countDown();
+ }
+ catch (Exception e) {
+ }
+ }
+ };
+
+ // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
+ // to avoid deadlocks the close has to be done outside of the main thread on an executor
+ // otherwise you could get a deadlock
+ Executor executor = protonSPI.getExeuctor();
+
+ if (executor != null) {
+ executor.execute(runnable);
+ }
+ else {
+ runnable.run();
+ }
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e) {
+ throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
+ }
+ }
+
+ public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception {
+ return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount);
+ }
+
+ public String tempQueueName() {
+ return UUIDGenerator.getInstance().generateStringUUID();
+ }
+
+ public void close() throws Exception {
+ //need to check here as this can be called if init fails
+ if (serverSession != null) {
+ recoverContext();
+ try {
+ serverSession.close(false);
+ }
+ finally {
+ resetContext();
+ }
+ }
+ }
+
+ public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception {
+ if (transaction == null) {
+ transaction = serverSession.getCurrentTransaction();
+ }
+ recoverContext();
+ try {
+ ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID());
+ }
+ finally {
+ resetContext();
+ }
+ }
+
+ public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception {
+ recoverContext();
+ try {
+ ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts);
+ }
+ finally {
+ resetContext();
+ }
+ }
+
+ public void resumeDelivery(Object consumer) {
+ ((ServerConsumer) consumer).receiveCredits(-1);
+ }
+
+ public void serverSend(final Transaction transaction,
+ final Receiver receiver,
+ final Delivery delivery,
+ String address,
+ int messageFormat,
+ ByteBuf messageEncoded) throws Exception {
+ EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
+
+ ServerMessage message = manager.getConverter().inbound(encodedMessage);
+ //use the address on the receiver if not null, if null let's hope it was set correctly on the message
+ if (address != null) {
+ message.setAddress(new SimpleString(address));
+ }
+
+ recoverContext();
+
+ PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
+ if (store.isRejectingMessages()) {
+ // We drop pre-settled messages (and abort any associated Tx)
+ if (delivery.remotelySettled()) {
+ if (transaction != null) {
+ String amqpAddress = delivery.getLink().getTarget().getAddress();
+ ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
+ transaction.markAsRollbackOnly(e);
+ }
+ }
+ else {
+ rejectMessage(delivery);
+ }
+ }
+ else {
+ serverSend(transaction, message, delivery, receiver);
+ }
+ }
+
+ private void rejectMessage(Delivery delivery) {
+ String address = delivery.getLink().getTarget().getAddress();
+ ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
+ Rejected rejected = new Rejected();
+ rejected.setError(ec);
+ delivery.disposition(rejected);
+ connection.flush();
+ }
+
+ private void serverSend(final Transaction transaction, final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
+ try {
+
+ message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
+ serverSession.send(transaction, message, false, false);
+
+ // FIXME Potential race here...
+ manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
+ @Override
+ public void done() {
+ synchronized (connection.getLock()) {
+ delivery.disposition(Accepted.getInstance());
+ delivery.settle();
+ connection.flush();
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ synchronized (connection.getLock()) {
+ receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
+ connection.flush();
+ }
+ }
+ });
+ }
+ finally {
+ resetContext();
+ }
+ }
+
+ public String getPubSubPrefix() {
+ return manager.getPubSubPrefix();
+ }
+
+ public void offerProducerCredit(final String address, final int credits, final int threshold, final Receiver receiver) {
+ try {
+ final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
+ store.checkMemory(new Runnable() {
+ @Override
+ public void run() {
+ if (receiver.getRemoteCredit() < threshold) {
+ receiver.flow(credits);
+ connection.flush();
+ }
+ }
+ });
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void deleteQueue(String queueName) throws Exception {
+ manager.getServer().destroyQueue(new SimpleString(queueName));
+ }
+
+ private void resetContext() {
+ manager.getServer().getStorageManager().setContext(null);
+ }
+
+ private void recoverContext() {
+ manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
+ }
+
+ @Override
+ public void sendProducerCreditsMessage(int credits, SimpleString address) {
+ }
+
+ @Override
+ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+ return false;
+ }
+
+ @Override
+ public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
+ }
+
+ @Override
+ public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+
+ message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
+
+ ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
+
+ try {
+ return plugSender.deliverMessage(message, deliveryCount);
+ }
+ catch (Exception e) {
+ synchronized (connection.getLock()) {
+ plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
+ connection.flush();
+ }
+ throw new IllegalStateException("Can't deliver message " + e, e);
+ }
+
+ }
+
+ @Override
+ public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+ return 0;
+ }
+
+ @Override
+ public int sendLargeMessageContinuation(ServerConsumer consumer,
+ byte[] body,
+ boolean continues,
+ boolean requiresResponse) {
+ return 0;
+ }
+
+ @Override
+ public void closed() {
+ }
+
+ @Override
+ public void disconnect(ServerConsumer consumer, String queueName) {
+ synchronized (connection.getLock()) {
+ ((Link) consumer.getProtocolContext()).close();
+ connection.flush();
+ }
+ }
+
+ @Override
+ public boolean hasCredits(ServerConsumer consumer) {
+ ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
+
+ if (plugSender != null && plugSender.getSender().getCredit() > 0) {
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+ return protonSPI.getTransaction(txid);
+ }
+
+ public Binary newTransaction() {
+ return protonSPI.newTransaction();
+ }
+
+
+ public void commitTX(Binary txid) throws Exception {
+ Transaction tx = protonSPI.getTransaction(txid);
+ tx.commit(true);
+ protonSPI.removeTransaction(txid);
+ }
+
+ public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
+ Transaction tx = protonSPI.getTransaction(txid);
+ tx.rollback();
+ protonSPI.removeTransaction(txid);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
new file mode 100644
index 0000000..8fd3169
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+
+/**
+ * This is a Server's Connection representation used by ActiveMQ Artemis.
+ */
+public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection {
+
+ private final AMQPConnectionContext amqpConnection;
+
+ private boolean destroyed = false;
+
+ private final ProtonProtocolManager manager;
+
+ public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager,
+ AMQPConnectionContext amqpConnection,
+ Connection transportConnection,
+ Executor executor) {
+ super(transportConnection, executor);
+ this.manager = manager;
+ this.amqpConnection = amqpConnection;
+ }
+
+ public Executor getExecutor() {
+ return this.executor;
+ }
+
+ public ProtonProtocolManager getManager() {
+ return manager;
+ }
+
+ /*
+ * This can be called concurrently by more than one thread so needs to be locked
+ */
+ @Override
+ public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
+ if (destroyed) {
+ return;
+ }
+
+ destroyed = true;
+
+ ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+
+ // Then call the listeners
+ callFailureListeners(me, scaleDownTargetNodeID);
+
+ callClosingListeners();
+
+ internalClose();
+ }
+
+ @Override
+ public void destroy() {
+ synchronized (this) {
+ if (destroyed) {
+ return;
+ }
+
+ destroyed = true;
+ }
+
+ callClosingListeners();
+
+ internalClose();
+
+ }
+
+ @Override
+ public boolean isClient() {
+ return false;
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return destroyed;
+ }
+
+ @Override
+ public void disconnect(boolean criticalError) {
+ getTransportConnection().close();
+ }
+
+ /**
+ * Disconnect the connection, closing all channels
+ */
+ @Override
+ public void disconnect(String scaleDownNodeID, boolean criticalError) {
+ getTransportConnection().close();
+ }
+
+ @Override
+ public boolean checkDataReceived() {
+ return amqpConnection.checkDataReceived();
+ }
+
+ @Override
+ public void flush() {
+ amqpConnection.flush();
+ }
+
+ @Override
+ public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+ amqpConnection.inputBuffer(buffer.byteBuf());
+ super.bufferReceived(connectionID, buffer);
+ }
+
+ private void internalClose() {
+ // We close the underlying transport connection
+ getTransportConnection().close();
+ }
+
+ @Override
+ public void killMessage(SimpleString nodeID) {
+ //unsupported
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
new file mode 100644
index 0000000..fe7b976
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
+import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.management.Notification;
+import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+
+/**
+ * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
+ */
+public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
+
+ private static final List<String> websocketRegistryNames = Arrays.asList("amqp");
+
+ private final ActiveMQServer server;
+
+ private MessageConverter protonConverter;
+
+ private final ProtonProtocolManagerFactory factory;
+
+ /*
+ * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
+ * the address. This can be changed on the acceptor.
+ * */
+ private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
+
+ private int maxFrameSize = AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
+
+ public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
+ this.factory = factory;
+ this.server = server;
+ this.protonConverter = new ProtonMessageConverter(server.getStorageManager());
+ }
+
+ public ActiveMQServer getServer() {
+ return server;
+ }
+
+ @Override
+ public MessageConverter getConverter() {
+ return protonConverter;
+ }
+
+ @Override
+ public void onNotification(Notification notification) {
+
+ }
+
+ @Override
+ public ProtocolManagerFactory<Interceptor> getFactory() {
+ return factory;
+ }
+
+ @Override
+ public void updateInterceptors(List<BaseInterceptor> incomingInterceptors,
+ List<BaseInterceptor> outgoingInterceptors) {
+ // no op
+ }
+
+ @Override
+ public boolean acceptsNoHandshake() {
+ return false;
+ }
+
+ @Override
+ public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
+ AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server);
+ long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
+
+ if (server.getConfiguration().getConnectionTTLOverride() != -1) {
+ ttl = server.getConfiguration().getConnectionTTLOverride();
+ }
+
+ String id = server.getConfiguration().getName();
+ AMQPConnectionContext amqpConnection =
+ new AMQPConnectionContext(connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
+
+ Executor executor = server.getExecutorFactory().getExecutor();
+
+ ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(this, amqpConnection, remotingConnection, executor);
+
+ connectionCallback.setProtonConnectionDelegate(delegate);
+
+ ConnectionEntry entry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
+
+ return entry;
+ }
+
+ @Override
+ public void removeHandler(String name) {
+
+ }
+
+ @Override
+ public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) {
+ ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection) connection;
+
+ protonConnection.bufferReceived(protonConnection.getID(), buffer);
+ }
+
+ @Override
+ public void addChannelHandlers(ChannelPipeline pipeline) {
+
+ }
+
+ @Override
+ public boolean isProtocol(byte[] array) {
+ return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P';
+ }
+
+ @Override
+ public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
+ }
+
+ @Override
+ public List<String> websocketSubprotocolIdentifiers() {
+ return websocketRegistryNames;
+ }
+
+ public String getPubSubPrefix() {
+ return pubSubPrefix;
+ }
+
+ public void setPubSubPrefix(String pubSubPrefix) {
+ this.pubSubPrefix = pubSubPrefix;
+ }
+
+
+ public int getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
new file mode 100644
index 0000000..7255ca0
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.utils.uri.BeanSupport;
+import org.osgi.service.component.annotations.Component;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Component(service = ProtocolManagerFactory.class)
+public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
+
+ private static final String AMQP_PROTOCOL_NAME = "AMQP";
+
+ private static final String MODULE_NAME = "artemis-amqp-protocol";
+
+ private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
+
+ @Override
+ public ProtocolManager createProtocolManager(ActiveMQServer server,
+ final Map<String, Object> parameters,
+ List<BaseInterceptor> incomingInterceptors,
+ List<BaseInterceptor> outgoingInterceptors) throws Exception {
+ return BeanSupport.setData(new ProtonProtocolManager(this, server), parameters);
+ }
+
+ @Override
+ public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
+ // no interceptors on Proton
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String[] getProtocols() {
+ return SUPPORTED_PROTOCOLS;
+ }
+
+ @Override
+ public String getModuleName() {
+ return MODULE_NAME;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/package-info.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/package-info.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/package-info.java
new file mode 100644
index 0000000..c8a3c6a
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package includes classes used to interact with the broker.
+ * The ProtocolManager will be here, and some classes that will interact with the PostOffice
+ * and other internal components directly.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
\ No newline at end of file