You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/18 02:42:54 UTC
[65/65] [abbrv] activemq-artemis git commit: cleanup on consumer
cleanup on consumer
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/52b63d67
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/52b63d67
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/52b63d67
Branch: refs/heads/refactor-openwire
Commit: 52b63d67e6988b80673bba956480e90fb364589a
Parents: ca53593
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 17 19:28:38 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 17 21:16:25 2016 -0400
----------------------------------------------------------------------
.../plug/ProtonSessionIntegrationCallback.java | 5 ++
.../core/protocol/mqtt/MQTTSessionCallback.java | 5 ++
.../protocol/openwire/OpenWireConnection.java | 1 -
.../core/protocol/openwire/amq/AMQConsumer.java | 12 +--
.../openwire/amq/AMQConsumerBrokerExchange.java | 9 +-
.../openwire/amq/AMQServerConsumer.java | 90 --------------------
.../protocol/openwire/amq/AMQServerSession.java | 2 +-
.../core/protocol/openwire/amq/AMQSession.java | 8 ++
.../protocol/openwire/amq/BrowserListener.java | 22 -----
.../core/protocol/stomp/StompSession.java | 5 ++
.../protocol/core/impl/CoreSessionCallback.java | 4 +
.../core/server/impl/ServerConsumerImpl.java | 1 +
.../spi/core/protocol/SessionCallback.java | 3 +
.../integration/client/HangConsumerTest.java | 5 ++
14 files changed, 41 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 12aad22..5d6af2a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -92,6 +92,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
+
+ @Override
public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
this.protonSession = protonSession;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 356dc73..28d86b8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -83,6 +83,11 @@ public class MQTTSessionCallback implements SessionCallback {
@Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
+
+ @Override
public boolean hasCredits(ServerConsumer consumerID) {
return true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index b872e63..17f26b0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -438,7 +438,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap);
}
synchronized (consumerExchanges) {
- result.setConnectionContext(context);
consumerExchanges.put(id, result);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 8e7ff49..d296213 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -46,7 +46,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.wireformat.WireFormat;
-public class AMQConsumer implements BrowserListener {
+public class AMQConsumer {
private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination actualDest;
@@ -137,7 +137,8 @@ public class AMQConsumer implements BrowserListener {
}
else {
SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
- coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+ AMQServerConsumer serverConsumer = (AMQServerConsumer)coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+ serverConsumer.setAmqConsumer(this);
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
if (addrSettings != null) {
//see PolicyEntry
@@ -150,12 +151,6 @@ public class AMQConsumer implements BrowserListener {
}
}
}
-
- if (info.isBrowser()) {
- AMQServerConsumer coreConsumer = coreSession.getConsumer(nativeId);
- coreConsumer.setBrowserListener(this);
- }
-
}
public long getNativeId() {
@@ -310,7 +305,6 @@ public class AMQConsumer implements BrowserListener {
acquireCredit(n);
}
- @Override
public void browseFinished() {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(info.getConsumerId());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
index c481618..21a45b1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
@@ -22,18 +22,11 @@ import org.apache.activemq.command.MessagePull;
public abstract class AMQConsumerBrokerExchange {
protected final AMQSession amqSession;
- private AMQConnectionContext connectionContext;
- private boolean wildcard;
public AMQConsumerBrokerExchange(AMQSession amqSession) {
this.amqSession = amqSession;
}
- /**
- * @param connectionContext the connectionContext to set
- */
- public void setConnectionContext(AMQConnectionContext connectionContext) {
- this.connectionContext = connectionContext;
- }
+
public abstract void acknowledge(MessageAck ack) throws Exception;
public abstract void processMessagePull(MessagePull messagePull) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
index b37e1cf..2f9d0bc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -23,8 +23,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -36,7 +34,6 @@ public class AMQServerConsumer extends ServerConsumerImpl {
// TODO-NOW: remove this once unified
AMQConsumer amqConsumer;
- boolean isClosing;
public AMQConsumer getAmqConsumer() {
return amqConsumer;
@@ -64,93 +61,6 @@ public class AMQServerConsumer extends ServerConsumerImpl {
super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
}
- public void setBrowserListener(BrowserListener listener) {
- AMQBrowserDeliverer newBrowserDeliverer = new AMQBrowserDeliverer(this.browserDeliverer);
- newBrowserDeliverer.listener = listener;
- this.browserDeliverer = newBrowserDeliverer;
- }
-
- public void closing() {
- isClosing = true;
- }
-
- @Override
- public HandleStatus handle(final MessageReference ref) throws Exception {
- if (isClosing) {
- return HandleStatus.BUSY;
- }
- return super.handle(ref);
- }
-
- private class AMQBrowserDeliverer extends BrowserDeliverer {
-
- private BrowserListener listener = null;
-
- public AMQBrowserDeliverer(final BrowserDeliverer other) {
- super(other.iterator);
- }
-
- @Override
- public synchronized void run() {
- // if the reference was busy during the previous iteration, handle it now
- if (current != null) {
- try {
- HandleStatus status = handle(current);
-
- if (status == HandleStatus.BUSY) {
- return;
- }
-
- if (status == HandleStatus.HANDLED) {
- proceedDeliver(current);
- }
-
- current = null;
- }
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, current);
- return;
- }
- }
-
- MessageReference ref = null;
- HandleStatus status;
-
- while (true) {
- try {
- ref = null;
- synchronized (messageQueue) {
- if (!iterator.hasNext()) {
- //here we need to send a null for amq browsers
- if (listener != null) {
- listener.browseFinished();
- }
- break;
- }
-
- ref = iterator.next();
-
- status = handle(ref);
- }
-
- if (status == HandleStatus.HANDLED) {
- proceedDeliver(ref);
- }
- else if (status == HandleStatus.BUSY) {
- // keep a reference on the current message reference
- // to handle it next time the browser deliverer is executed
- current = ref;
- break;
- }
- }
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref);
- break;
- }
- }
- }
- }
-
public void amqPutBackToDeliveringList(final List<MessageReference> refs) {
synchronized (this.deliveringRefs) {
for (MessageReference ref : refs) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
index b603257..3f0259d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
@@ -94,7 +94,7 @@ public class AMQServerSession extends ServerSessionImpl {
Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values());
for (ServerConsumer consumer : consumersClone) {
AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer;
- amqConsumer.closing();//prevent redeliver
+ amqConsumer.setStarted(false);
}
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index e3d2266..86ea582 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -165,6 +165,14 @@ public class AMQSession implements SessionCallback {
}
@Override
+ public void browserFinished(ServerConsumer consumer) {
+ AMQConsumer theConsumer = ((AMQServerConsumer)consumer).getAmqConsumer();
+ if (theConsumer != null) {
+ theConsumer.browseFinished();
+ }
+ }
+
+ @Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java
deleted file mode 100644
index 0e192db..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-interface BrowserListener {
-
- void browseFinished();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index e94e0bc..a6cbe71 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -113,6 +113,11 @@ public class StompSession implements SessionCallback {
}
@Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
+
+ @Override
public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
LargeServerMessageImpl largeMessage = null;
ServerMessage newServerMessage = serverMessage;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 0b74fd7..c05a288 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -98,6 +98,10 @@ public final class CoreSessionCallback implements SessionCallback {
channel.send(packet);
}
+ @Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
@Override
public void afterDelivery() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index cb2cd38..298bf5f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1213,6 +1213,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref = null;
synchronized (messageQueue) {
if (!iterator.hasNext()) {
+ callback.browserFinished(ServerConsumerImpl.this);
break;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 4b27bc4..a9eb0f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -50,4 +50,7 @@ public interface SessionCallback {
void disconnect(ServerConsumer consumerId, String queueName);
boolean isWritable(ReadyListener callback);
+
+ /** Some protocols (Openwire) needs a special message with the browser is finished. */
+ void browserFinished(ServerConsumer consumer);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52b63d67/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index d8aa4ac..54ae6c8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -484,6 +484,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
@Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
+
+ @Override
public boolean isWritable(ReadyListener callback) {
return true;
}