You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/08/28 21:33:46 UTC
[03/10] activemq-artemis git commit: removing dead code on openwire
implementation
removing dead code on openwire implementation
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3fbf75b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3fbf75b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3fbf75b2
Branch: refs/heads/master
Commit: 3fbf75b2ff79cd6d3e4847c1bbfe85956689375d
Parents: 8d98fc3
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 28 14:00:04 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 28 15:05:41 2015 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 63 +----
.../openwire/amq/AMQConsumerBrokerExchange.java | 15 --
.../protocol/openwire/amq/AMQDestination.java | 241 -------------------
.../openwire/amq/AMQProducerBrokerExchange.java | 20 --
.../openwire/amq/AMQSlowConsumerStrategy.java | 39 ---
.../protocol/openwire/amq/AMQSubscription.java | 22 +-
6 files changed, 8 insertions(+), 392 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index ffe2b38..35861a9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -97,7 +97,6 @@ import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
-import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence;
@@ -134,8 +133,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
private AMQMessageAuthorizationPolicy messageAuthorizationPolicy;
- private boolean networkConnection;
-
private boolean manageable;
private boolean pendingStop;
@@ -153,18 +150,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
private final CountDownLatch stopped = new CountDownLatch(1);
- protected TaskRunner taskRunner;
-
private boolean active;
protected final List<Command> dispatchQueue = new LinkedList<Command>();
- private boolean markedCandidate;
-
- private boolean blockedCandidate;
-
- private long timeStamp;
-
private boolean inServiceException;
private final AtomicBoolean asyncException = new AtomicBoolean(false);
@@ -575,7 +564,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
// it should be related to activemq's Acceptor
context.setConnector(this.acceptorUsed);
context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
- context.setNetworkConnection(networkConnection);
context.setFaultTolerant(faultTolerantConnection);
context.setTransactions(new ConcurrentHashMap<TransactionId, AMQTransaction>());
context.setUserName(info.getUserName());
@@ -612,30 +600,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
public void dispatchAsync(Command message) {
- if (!stopping.get()) {
- if (taskRunner == null) {
- dispatchSync(message);
- }
- else {
- synchronized (dispatchQueue) {
- dispatchQueue.add(message);
- }
- try {
- taskRunner.wakeup();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- else {
- if (message.isMessageDispatch()) {
- MessageDispatch md = (MessageDispatch) message;
- TransmitCallback sub = md.getTransmitCallback();
- protocolManager.postProcessDispatch(md);
- if (sub != null) {
- sub.onFailure();
- }
+ if (message.isMessageDispatch()) {
+ MessageDispatch md = (MessageDispatch) message;
+ TransmitCallback sub = md.getTransmitCallback();
+ protocolManager.postProcessDispatch(md);
+ if (sub != null) {
+ sub.onFailure();
}
}
}
@@ -722,22 +692,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
*/
}
- public void setMarkedCandidate(boolean markedCandidate) {
- this.markedCandidate = markedCandidate;
- if (!markedCandidate) {
- timeStamp = 0;
- blockedCandidate = false;
- }
- }
-
protected void dispatch(Command command) throws IOException {
- try {
- setMarkedCandidate(true);
- this.physicalSend(command);
- }
- finally {
- setMarkedCandidate(false);
- }
+ this.physicalSend(command);
}
protected void processDispatch(Command command) throws IOException {
@@ -854,11 +810,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
// log
}
- if (taskRunner != null) {
- taskRunner.shutdown(1);
- taskRunner = null;
- }
-
active = false;
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
index 1e87db3..d92cd15 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
@@ -23,7 +23,6 @@ public abstract class AMQConsumerBrokerExchange {
protected final AMQSession amqSession;
private AMQConnectionContext connectionContext;
- private AMQDestination regionDestination;
private AMQSubscription subscription;
private boolean wildcard;
@@ -46,20 +45,6 @@ public abstract class AMQConsumerBrokerExchange {
}
/**
- * @return the regionDestination
- */
- public AMQDestination getRegionDestination() {
- return this.regionDestination;
- }
-
- /**
- * @param regionDestination the regionDestination to set
- */
- public void setRegionDestination(AMQDestination regionDestination) {
- this.regionDestination = regionDestination;
- }
-
- /**
* @return the subscription
*/
public AMQSubscription getSubscription() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java
deleted file mode 100644
index ca06103..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.Usage;
-
-public interface AMQDestination {
-
- AMQDeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new AMQSharedDeadLetterStrategy();
- long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000;
-
- void addSubscription(AMQConnectionContext context, AMQSubscription sub) throws Exception;
-
- void removeSubscription(AMQConnectionContext context,
- AMQSubscription sub,
- long lastDeliveredSequenceId) throws Exception;
-
- void addProducer(AMQConnectionContext context, ProducerInfo info) throws Exception;
-
- void removeProducer(AMQConnectionContext context, ProducerInfo info) throws Exception;
-
- void send(AMQProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
-
- void acknowledge(AMQConnectionContext context,
- AMQSubscription sub,
- final MessageAck ack,
- final MessageReference node) throws IOException;
-
- long getInactiveTimoutBeforeGC();
-
- void markForGC(long timeStamp);
-
- boolean canGC();
-
- void gc();
-
- ActiveMQDestination getActiveMQDestination();
-
- MemoryUsage getMemoryUsage();
-
- void setMemoryUsage(MemoryUsage memoryUsage);
-
- void dispose(AMQConnectionContext context) throws IOException;
-
- boolean isDisposed();
-
- AMQDestinationStatistics getDestinationStatistics();
-
- AMQDeadLetterStrategy getDeadLetterStrategy();
-
- Message[] browse();
-
- String getName();
-
- AMQMessageStore getMessageStore();
-
- boolean isProducerFlowControl();
-
- void setProducerFlowControl(boolean value);
-
- boolean isAlwaysRetroactive();
-
- void setAlwaysRetroactive(boolean value);
-
- /**
- * Set's the interval at which warnings about producers being blocked by
- * resource usage will be triggered. Values of 0 or less will disable
- * warnings
- *
- * @param blockedProducerWarningInterval the interval at which warning about blocked producers will be
- * triggered.
- */
- void setBlockedProducerWarningInterval(long blockedProducerWarningInterval);
-
- /**
- * @return the interval at which warning about blocked producers will be
- * triggered.
- */
- long getBlockedProducerWarningInterval();
-
- int getMaxProducersToAudit();
-
- void setMaxProducersToAudit(int maxProducersToAudit);
-
- int getMaxAuditDepth();
-
- void setMaxAuditDepth(int maxAuditDepth);
-
- boolean isEnableAudit();
-
- void setEnableAudit(boolean enableAudit);
-
- boolean isActive();
-
- int getMaxPageSize();
-
- void setMaxPageSize(int maxPageSize);
-
- int getMaxBrowsePageSize();
-
- void setMaxBrowsePageSize(int maxPageSize);
-
- boolean isUseCache();
-
- void setUseCache(boolean useCache);
-
- int getMinimumMessageSize();
-
- void setMinimumMessageSize(int minimumMessageSize);
-
- int getCursorMemoryHighWaterMark();
-
- void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
-
- /**
- * optionally called by a Subscriber - to inform the Destination its ready
- * for more messages
- */
- void wakeup();
-
- /**
- * @return true if lazyDispatch is enabled
- */
- boolean isLazyDispatch();
-
- /**
- * set the lazy dispatch - default is false
- *
- * @param value
- */
- void setLazyDispatch(boolean value);
-
- /**
- * Inform the Destination a message has expired
- *
- * @param context
- * @param subs
- * @param node
- */
- void messageExpired(AMQConnectionContext context, AMQSubscription subs, MessageReference node);
-
- /**
- * called when message is consumed
- *
- * @param context
- * @param messageReference
- */
- void messageConsumed(AMQConnectionContext context, MessageReference messageReference);
-
- /**
- * Called when message is delivered to the broker
- *
- * @param context
- * @param messageReference
- */
- void messageDelivered(AMQConnectionContext context, MessageReference messageReference);
-
- /**
- * Called when a message is discarded - e.g. running low on memory This will
- * happen only if the policy is enabled - e.g. non durable topics
- *
- * @param context
- * @param messageReference
- * @param sub
- */
- void messageDiscarded(AMQConnectionContext context, AMQSubscription sub, MessageReference messageReference);
-
- /**
- * Called when there is a slow consumer
- *
- * @param context
- * @param subs
- */
- void slowConsumer(AMQConnectionContext context, AMQSubscription subs);
-
- /**
- * Called to notify a producer is too fast
- *
- * @param context
- * @param producerInfo
- */
- void fastProducer(AMQConnectionContext context, ProducerInfo producerInfo);
-
- /**
- * Called when a Usage reaches a limit
- *
- * @param context
- * @param usage
- */
- void isFull(AMQConnectionContext context, Usage<?> usage);
-
- List<AMQSubscription> getConsumers();
-
- /**
- * called on Queues in slave mode to allow dispatch to follow subscription
- * choice of master
- *
- * @param messageDispatchNotification
- * @throws Exception
- */
- void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
-
- boolean isPrioritizedMessages();
-
- AMQSlowConsumerStrategy getSlowConsumerStrategy();
-
- boolean isDoOptimzeMessageStorage();
-
- void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
-
- void clearPendingMessages();
-
- boolean isDLQ();
-
- void duplicateFromStore(Message message, AMQSubscription subscription);
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
index d7648cc..f94c119 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
@@ -26,7 +26,6 @@ import org.apache.activemq.state.ProducerState;
public class AMQProducerBrokerExchange {
private AMQConnectionContext connectionContext;
- private AMQDestination regionDestination;
private ProducerState producerState;
private boolean mutable = true;
private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
@@ -40,7 +39,6 @@ public class AMQProducerBrokerExchange {
public AMQProducerBrokerExchange copy() {
AMQProducerBrokerExchange rc = new AMQProducerBrokerExchange();
rc.connectionContext = connectionContext.copy();
- rc.regionDestination = regionDestination;
rc.producerState = producerState;
rc.mutable = mutable;
return rc;
@@ -75,20 +73,6 @@ public class AMQProducerBrokerExchange {
}
/**
- * @return the regionDestination
- */
- public AMQDestination getRegionDestination() {
- return this.regionDestination;
- }
-
- /**
- * @param regionDestination the regionDestination to set
- */
- public void setRegionDestination(AMQDestination regionDestination) {
- this.regionDestination = regionDestination;
- }
-
- /**
* @return the producerState
*/
public ProducerState getProducerState() {
@@ -149,10 +133,6 @@ public class AMQProducerBrokerExchange {
flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
}
- public void incrementTimeBlocked(AMQDestination destination, long timeBlocked) {
- flowControlInfo.incrementTimeBlocked(timeBlocked);
- }
-
public boolean isBlockedForFlowControl() {
return flowControlInfo.isBlockingOnFlowControl();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
deleted file mode 100644
index 333dd04..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-public interface AMQSlowConsumerStrategy {
-
- /**
- * Slow consumer event.
- *
- * @param context Connection context of the subscription.
- * @param subs The subscription object for the slow consumer.
- */
- void slowConsumer(AMQConnectionContext context, AMQSubscription subs);
-
- /**
- * For Strategies that need to examine assigned destination for slow consumers
- * periodically the destination is assigned here.
- *
- * If the strategy doesn't is event driven it can just ignore assigned destination.
- *
- * @param destination A destination to add to a watch list.
- */
- void addDestination(AMQDestination destination);
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java
index a23d675..abf492d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java
@@ -16,11 +16,9 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
-import java.io.IOException;
-import java.util.List;
-
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
+import java.io.IOException;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.ActiveMQDestination;
@@ -82,24 +80,6 @@ public interface AMQSubscription extends AMQSubscriptionRecovery {
boolean matches(ActiveMQDestination destination);
/**
- * The subscription will be receiving messages from the destination.
- *
- * @param context
- * @param destination
- * @throws Exception
- */
- void add(AMQConnectionContext context, AMQDestination destination) throws Exception;
-
- /**
- * The subscription will be no longer be receiving messages from the destination.
- *
- * @param context
- * @param destination
- * @return a list of un-acked messages that were added to the subscription.
- */
- List<MessageReference> remove(AMQConnectionContext context, AMQDestination destination) throws Exception;
-
- /**
* The ConsumerInfo object that created the subscription.
*/
ConsumerInfo getConsumerInfo();