You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/08/28 21:33:46 UTC

[03/10] activemq-artemis git commit: removing dead code on openwire implementation

removing dead code on openwire implementation


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3fbf75b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3fbf75b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3fbf75b2

Branch: refs/heads/master
Commit: 3fbf75b2ff79cd6d3e4847c1bbfe85956689375d
Parents: 8d98fc3
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 28 14:00:04 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 28 15:05:41 2015 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  63 +----
 .../openwire/amq/AMQConsumerBrokerExchange.java |  15 --
 .../protocol/openwire/amq/AMQDestination.java   | 241 -------------------
 .../openwire/amq/AMQProducerBrokerExchange.java |  20 --
 .../openwire/amq/AMQSlowConsumerStrategy.java   |  39 ---
 .../protocol/openwire/amq/AMQSubscription.java  |  22 +-
 6 files changed, 8 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index ffe2b38..35861a9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -97,7 +97,6 @@ import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.state.ConsumerState;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
-import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.util.ByteSequence;
@@ -134,8 +133,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    private AMQMessageAuthorizationPolicy messageAuthorizationPolicy;
 
-   private boolean networkConnection;
-
    private boolean manageable;
 
    private boolean pendingStop;
@@ -153,18 +150,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
 
    private final CountDownLatch stopped = new CountDownLatch(1);
 
-   protected TaskRunner taskRunner;
-
    private boolean active;
 
    protected final List<Command> dispatchQueue = new LinkedList<Command>();
 
-   private boolean markedCandidate;
-
-   private boolean blockedCandidate;
-
-   private long timeStamp;
-
    private boolean inServiceException;
 
    private final AtomicBoolean asyncException = new AtomicBoolean(false);
@@ -575,7 +564,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
       // it should be related to activemq's Acceptor
       context.setConnector(this.acceptorUsed);
       context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
-      context.setNetworkConnection(networkConnection);
       context.setFaultTolerant(faultTolerantConnection);
       context.setTransactions(new ConcurrentHashMap<TransactionId, AMQTransaction>());
       context.setUserName(info.getUserName());
@@ -612,30 +600,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
    }
 
    public void dispatchAsync(Command message) {
-      if (!stopping.get()) {
-         if (taskRunner == null) {
-            dispatchSync(message);
-         }
-         else {
-            synchronized (dispatchQueue) {
-               dispatchQueue.add(message);
-            }
-            try {
-               taskRunner.wakeup();
-            }
-            catch (InterruptedException e) {
-               Thread.currentThread().interrupt();
-            }
-         }
-      }
-      else {
-         if (message.isMessageDispatch()) {
-            MessageDispatch md = (MessageDispatch) message;
-            TransmitCallback sub = md.getTransmitCallback();
-            protocolManager.postProcessDispatch(md);
-            if (sub != null) {
-               sub.onFailure();
-            }
+      if (message.isMessageDispatch()) {
+         MessageDispatch md = (MessageDispatch) message;
+         TransmitCallback sub = md.getTransmitCallback();
+         protocolManager.postProcessDispatch(md);
+         if (sub != null) {
+            sub.onFailure();
          }
       }
    }
@@ -722,22 +692,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
        */
    }
 
-   public void setMarkedCandidate(boolean markedCandidate) {
-      this.markedCandidate = markedCandidate;
-      if (!markedCandidate) {
-         timeStamp = 0;
-         blockedCandidate = false;
-      }
-   }
-
    protected void dispatch(Command command) throws IOException {
-      try {
-         setMarkedCandidate(true);
-         this.physicalSend(command);
-      }
-      finally {
-         setMarkedCandidate(false);
-      }
+      this.physicalSend(command);
    }
 
    protected void processDispatch(Command command) throws IOException {
@@ -854,11 +810,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
          // log
       }
 
-      if (taskRunner != null) {
-         taskRunner.shutdown(1);
-         taskRunner = null;
-      }
-
       active = false;
       // Run the MessageDispatch callbacks so that message references get
       // cleaned up.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
index 1e87db3..d92cd15 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
@@ -23,7 +23,6 @@ public abstract class AMQConsumerBrokerExchange {
 
    protected final AMQSession amqSession;
    private AMQConnectionContext connectionContext;
-   private AMQDestination regionDestination;
    private AMQSubscription subscription;
    private boolean wildcard;
 
@@ -46,20 +45,6 @@ public abstract class AMQConsumerBrokerExchange {
    }
 
    /**
-    * @return the regionDestination
-    */
-   public AMQDestination getRegionDestination() {
-      return this.regionDestination;
-   }
-
-   /**
-    * @param regionDestination the regionDestination to set
-    */
-   public void setRegionDestination(AMQDestination regionDestination) {
-      this.regionDestination = regionDestination;
-   }
-
-   /**
     * @return the subscription
     */
    public AMQSubscription getSubscription() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java
deleted file mode 100644
index ca06103..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQDestination.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.Usage;
-
-public interface AMQDestination {
-
-   AMQDeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new AMQSharedDeadLetterStrategy();
-   long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000;
-
-   void addSubscription(AMQConnectionContext context, AMQSubscription sub) throws Exception;
-
-   void removeSubscription(AMQConnectionContext context,
-                           AMQSubscription sub,
-                           long lastDeliveredSequenceId) throws Exception;
-
-   void addProducer(AMQConnectionContext context, ProducerInfo info) throws Exception;
-
-   void removeProducer(AMQConnectionContext context, ProducerInfo info) throws Exception;
-
-   void send(AMQProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
-
-   void acknowledge(AMQConnectionContext context,
-                    AMQSubscription sub,
-                    final MessageAck ack,
-                    final MessageReference node) throws IOException;
-
-   long getInactiveTimoutBeforeGC();
-
-   void markForGC(long timeStamp);
-
-   boolean canGC();
-
-   void gc();
-
-   ActiveMQDestination getActiveMQDestination();
-
-   MemoryUsage getMemoryUsage();
-
-   void setMemoryUsage(MemoryUsage memoryUsage);
-
-   void dispose(AMQConnectionContext context) throws IOException;
-
-   boolean isDisposed();
-
-   AMQDestinationStatistics getDestinationStatistics();
-
-   AMQDeadLetterStrategy getDeadLetterStrategy();
-
-   Message[] browse();
-
-   String getName();
-
-   AMQMessageStore getMessageStore();
-
-   boolean isProducerFlowControl();
-
-   void setProducerFlowControl(boolean value);
-
-   boolean isAlwaysRetroactive();
-
-   void setAlwaysRetroactive(boolean value);
-
-   /**
-    * Set's the interval at which warnings about producers being blocked by
-    * resource usage will be triggered. Values of 0 or less will disable
-    * warnings
-    *
-    * @param blockedProducerWarningInterval the interval at which warning about blocked producers will be
-    *                                       triggered.
-    */
-   void setBlockedProducerWarningInterval(long blockedProducerWarningInterval);
-
-   /**
-    * @return the interval at which warning about blocked producers will be
-    * triggered.
-    */
-   long getBlockedProducerWarningInterval();
-
-   int getMaxProducersToAudit();
-
-   void setMaxProducersToAudit(int maxProducersToAudit);
-
-   int getMaxAuditDepth();
-
-   void setMaxAuditDepth(int maxAuditDepth);
-
-   boolean isEnableAudit();
-
-   void setEnableAudit(boolean enableAudit);
-
-   boolean isActive();
-
-   int getMaxPageSize();
-
-   void setMaxPageSize(int maxPageSize);
-
-   int getMaxBrowsePageSize();
-
-   void setMaxBrowsePageSize(int maxPageSize);
-
-   boolean isUseCache();
-
-   void setUseCache(boolean useCache);
-
-   int getMinimumMessageSize();
-
-   void setMinimumMessageSize(int minimumMessageSize);
-
-   int getCursorMemoryHighWaterMark();
-
-   void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
-
-   /**
-    * optionally called by a Subscriber - to inform the Destination its ready
-    * for more messages
-    */
-   void wakeup();
-
-   /**
-    * @return true if lazyDispatch is enabled
-    */
-   boolean isLazyDispatch();
-
-   /**
-    * set the lazy dispatch - default is false
-    *
-    * @param value
-    */
-   void setLazyDispatch(boolean value);
-
-   /**
-    * Inform the Destination a message has expired
-    *
-    * @param context
-    * @param subs
-    * @param node
-    */
-   void messageExpired(AMQConnectionContext context, AMQSubscription subs, MessageReference node);
-
-   /**
-    * called when message is consumed
-    *
-    * @param context
-    * @param messageReference
-    */
-   void messageConsumed(AMQConnectionContext context, MessageReference messageReference);
-
-   /**
-    * Called when message is delivered to the broker
-    *
-    * @param context
-    * @param messageReference
-    */
-   void messageDelivered(AMQConnectionContext context, MessageReference messageReference);
-
-   /**
-    * Called when a message is discarded - e.g. running low on memory This will
-    * happen only if the policy is enabled - e.g. non durable topics
-    *
-    * @param context
-    * @param messageReference
-    * @param sub
-    */
-   void messageDiscarded(AMQConnectionContext context, AMQSubscription sub, MessageReference messageReference);
-
-   /**
-    * Called when there is a slow consumer
-    *
-    * @param context
-    * @param subs
-    */
-   void slowConsumer(AMQConnectionContext context, AMQSubscription subs);
-
-   /**
-    * Called to notify a producer is too fast
-    *
-    * @param context
-    * @param producerInfo
-    */
-   void fastProducer(AMQConnectionContext context, ProducerInfo producerInfo);
-
-   /**
-    * Called when a Usage reaches a limit
-    *
-    * @param context
-    * @param usage
-    */
-   void isFull(AMQConnectionContext context, Usage<?> usage);
-
-   List<AMQSubscription> getConsumers();
-
-   /**
-    * called on Queues in slave mode to allow dispatch to follow subscription
-    * choice of master
-    *
-    * @param messageDispatchNotification
-    * @throws Exception
-    */
-   void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
-
-   boolean isPrioritizedMessages();
-
-   AMQSlowConsumerStrategy getSlowConsumerStrategy();
-
-   boolean isDoOptimzeMessageStorage();
-
-   void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
-
-   void clearPendingMessages();
-
-   boolean isDLQ();
-
-   void duplicateFromStore(Message message, AMQSubscription subscription);
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
index d7648cc..f94c119 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
@@ -26,7 +26,6 @@ import org.apache.activemq.state.ProducerState;
 public class AMQProducerBrokerExchange {
 
    private AMQConnectionContext connectionContext;
-   private AMQDestination regionDestination;
    private ProducerState producerState;
    private boolean mutable = true;
    private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
@@ -40,7 +39,6 @@ public class AMQProducerBrokerExchange {
    public AMQProducerBrokerExchange copy() {
       AMQProducerBrokerExchange rc = new AMQProducerBrokerExchange();
       rc.connectionContext = connectionContext.copy();
-      rc.regionDestination = regionDestination;
       rc.producerState = producerState;
       rc.mutable = mutable;
       return rc;
@@ -75,20 +73,6 @@ public class AMQProducerBrokerExchange {
    }
 
    /**
-    * @return the regionDestination
-    */
-   public AMQDestination getRegionDestination() {
-      return this.regionDestination;
-   }
-
-   /**
-    * @param regionDestination the regionDestination to set
-    */
-   public void setRegionDestination(AMQDestination regionDestination) {
-      this.regionDestination = regionDestination;
-   }
-
-   /**
     * @return the producerState
     */
    public ProducerState getProducerState() {
@@ -149,10 +133,6 @@ public class AMQProducerBrokerExchange {
       flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
    }
 
-   public void incrementTimeBlocked(AMQDestination destination, long timeBlocked) {
-      flowControlInfo.incrementTimeBlocked(timeBlocked);
-   }
-
    public boolean isBlockedForFlowControl() {
       return flowControlInfo.isBlockingOnFlowControl();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
deleted file mode 100644
index 333dd04..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-public interface AMQSlowConsumerStrategy {
-
-   /**
-    * Slow consumer event.
-    *
-    * @param context Connection context of the subscription.
-    * @param subs    The subscription object for the slow consumer.
-    */
-   void slowConsumer(AMQConnectionContext context, AMQSubscription subs);
-
-   /**
-    * For Strategies that need to examine assigned destination for slow consumers
-    * periodically the destination is assigned here.
-    *
-    * If the strategy doesn't is event driven it can just ignore assigned destination.
-    *
-    * @param destination A destination to add to a watch list.
-    */
-   void addDestination(AMQDestination destination);
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3fbf75b2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java
index a23d675..abf492d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSubscription.java
@@ -16,11 +16,9 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
-import java.io.IOException;
-import java.util.List;
-
 import javax.jms.InvalidSelectorException;
 import javax.management.ObjectName;
+import java.io.IOException;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -82,24 +80,6 @@ public interface AMQSubscription extends AMQSubscriptionRecovery {
    boolean matches(ActiveMQDestination destination);
 
    /**
-    * The subscription will be receiving messages from the destination.
-    *
-    * @param context
-    * @param destination
-    * @throws Exception
-    */
-   void add(AMQConnectionContext context, AMQDestination destination) throws Exception;
-
-   /**
-    * The subscription will be no longer be receiving messages from the destination.
-    *
-    * @param context
-    * @param destination
-    * @return a list of un-acked messages that were added to the subscription.
-    */
-   List<MessageReference> remove(AMQConnectionContext context, AMQDestination destination) throws Exception;
-
-   /**
     * The ConsumerInfo object that created the subscription.
     */
    ConsumerInfo getConsumerInfo();