You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by cd...@apache.org on 2014/05/05 22:08:39 UTC
[25/51] [partial] FLEX-34306 - [BlazeDS] Make the BlazeDS build run
on Windows machines - Added some mkdir commands to the ANT Build.java - Did
some fine-tuning to resolve some compile errors
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/MessageClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/MessageClient.java b/modules/core/src/flex/messaging/MessageClient.java
old mode 100755
new mode 100644
index e9c8ccc..ea19eda
--- a/modules/core/src/flex/messaging/MessageClient.java
+++ b/modules/core/src/flex/messaging/MessageClient.java
@@ -1,1144 +1,1144 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import flex.messaging.client.FlexClient;
-import flex.messaging.client.FlexClientOutboundQueueProcessor;
-import flex.messaging.client.OutboundQueueThrottleManager;
-import flex.messaging.config.ThrottleSettings;
-import flex.messaging.config.ThrottleSettings.Policy;
-import flex.messaging.log.LogCategories;
-import flex.messaging.log.Log;
-import flex.messaging.messages.AsyncMessage;
-import flex.messaging.messages.CommandMessage;
-import flex.messaging.messages.Message;
-import flex.messaging.services.MessageService;
-import flex.messaging.services.messaging.Subtopic;
-import flex.messaging.services.messaging.selector.JMSSelector;
-import flex.messaging.services.messaging.selector.JMSSelectorException;
-import flex.messaging.util.ExceptionUtil;
-import flex.messaging.util.TimeoutAbstractObject;
-import flex.messaging.util.StringUtils;
-
-/**
- * Represents a client-side MessageAgent instance.
- * Currently a server-side MessageClient is only created if its client-side counterpart has subscribed
- * to a destination for pushed data (e.g. Consumer). Client-side Producers do not result in the creation of
- * corresponding server-side MessageClient instances.
- *
- * Client-side MessageAgents communicate with the server over a Channel that corresponds to a FlexSession.
- * Server-side MessageClient instances are always created in the context of a FlexSession and when the FlexSession
- * is invalidated any associated MessageClients are invalidated as well.
- *
- * MessageClients may also be timed out on a per-destination basis and this is based on subscription inactivity.
- * If no messages are pushed to the MessageClient within the destination's subscription timeout period the
- * MessageClient will be shutdown even if the associated FlexSession is still active and connected.
- * Per-destination subscription timeout is an optional configuration setting, and should only be used when inactive
- * subscriptions should be shut down opportunistically to preserve server resources.
- */
-public class MessageClient extends TimeoutAbstractObject implements Serializable
-{
- //--------------------------------------------------------------------------
- //
- // Public Static Variables
- //
- //--------------------------------------------------------------------------
-
- /**
- * Log category for MessageClient related messages.
- */
- public static final String MESSAGE_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_MESSAGECLIENT;
-
- //--------------------------------------------------------------------------
- //
- // Static Constants
- //
- //--------------------------------------------------------------------------
-
- /**
- * Serializable to support broadcasting subscription state across the cluster for
- * optimized message routing.
- */
- static final long serialVersionUID = 3730240451524954453L;
-
- //--------------------------------------------------------------------------
- //
- // Static Variables
- //
- //--------------------------------------------------------------------------
-
- /**
- * The list of MessageClient created listeners.
- */
- private static final CopyOnWriteArrayList<MessageClientListener> createdListeners = new CopyOnWriteArrayList<MessageClientListener>();
-
- //--------------------------------------------------------------------------
- //
- // Static Methods
- //
- //--------------------------------------------------------------------------
-
- /**
- * Adds a MessageClient created listener.
- *
- * @see flex.messaging.MessageClientListener
- *
- * @param listener The listener to add.
- */
- public static void addMessageClientCreatedListener(MessageClientListener listener)
- {
- if (listener != null)
- createdListeners.addIfAbsent(listener);
- }
-
- /**
- * Removes a MessageClient created listener.
- *
- * @see flex.messaging.MessageClientListener
- *
- * @param listener The listener to remove.
- */
- public static void removeMessageClientCreatedListener(MessageClientListener listener)
- {
- if (listener != null)
- createdListeners.remove(listener);
- }
-
- //--------------------------------------------------------------------------
- //
- // Private Static Methods
- //
- //--------------------------------------------------------------------------
-
- /**
- * Utility method.
- */
- private static boolean equalStrings(String a, String b)
- {
- return a == b || (a != null && a.equals(b));
- }
-
- /**
- * Utility method.
- */
- static int compareStrings(String a, String b)
- {
- if (a == b)
- return 0;
-
- if (a != null && b != null)
- return a.compareTo(b);
-
- if (a == null)
- return -1;
-
- return 1;
- }
-
- //--------------------------------------------------------------------------
- //
- // Constructor
- //
- //--------------------------------------------------------------------------
-
- /**
- * @exclude
- * Constructs a new MessageClient for local use.
- *
- * @param clientId The clientId for the MessageClient.
- * @param destination The destination the MessageClient is subscribed to.
- * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
- */
- public MessageClient(Object clientId, Destination destination, String endpointId)
- {
- this(clientId, destination, endpointId, true);
- }
-
- /**
- * @exclude
- * Constructs a new MessageClient.
- *
- * @param clientId The clientId for the MessageClient.
- * @param destination The destination the MessageClient is subscribed to.
- * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
- * @param useSession RemoteMessageClient instances should not be associated with a FlexSession (pass false).
- */
- public MessageClient(Object clientId, Destination destination, String endpointId, boolean useSession)
- {
- valid = true;
- this.clientId = clientId;
- this.destination = destination;
- this.endpointId = endpointId;
- destinationId = destination.getId();
- updateLastUse(); // Initialize last use timestamp to construct time.
-
- /* If this is for a remote server, we do not associate with the session. */
- if (useSession)
- {
- flexSession = FlexContext.getFlexSession();
- flexSession.registerMessageClient(this);
-
- flexClient = FlexContext.getFlexClient();
- flexClient.registerMessageClient(this);
-
- // SubscriptionManager will notify the created listeners, once
- // subscription state is setup completely.
- // notifyCreatedListeners();
- }
- else
- {
- flexClient = null;
- flexSession = null;
- // Use an instance level lock.
- lock = new Object();
- // On a remote server we don't notify created listeners.
- }
-
- if (Log.isDebug())
- Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient created with clientId '" + this.clientId + "' for destination '" + destinationId + "'.");
- }
-
- //--------------------------------------------------------------------------
- //
- // Variables
- //
- //--------------------------------------------------------------------------
-
- /**
- * This flag is set to true when the client channel that this subscription was
- * established over is disconnected.
- * It supports cleaning up per-endpoint outbound queues maintained by the FlexClient.
- * If the client notifies the server that its channel is disconnecting, the FlexClient
- * does not need to maintain an outbound queue containing a subscription invalidation
- * message for this MessageClient to send to the client.
- */
- private volatile boolean clientChannelDisconnected;
-
- /**
- * The clientId for the MessageClient.
- * This value is specified by the client directly or is autogenerated on the client.
- */
- protected final Object clientId;
-
- /**
- * Internal reference to the associated Destination; don't expose this in the public API.
- */
- protected final Destination destination;
-
- /**
- * The destination the MessageClient is subscribed to.
- */
- protected final String destinationId;
-
- /**
- * The set of session destroy listeners to notify when the session is destroyed.
- */
- private transient volatile CopyOnWriteArrayList destroyedListeners;
-
- /**
- * The Id for the endpoint this MessageClient subscription was created over.
- */
- private String endpointId;
-
- /**
- * The FlexClient associated with the MessageClient.
- */
- private final transient FlexClient flexClient;
-
- /**
- * The FlexSession associated with the MessageClient.
- * Not final because this needs to be reset if the subscription fails over to a new endpoint.
- */
- private transient FlexSession flexSession;
-
- /**
- * Flag used to break cycles during invalidation.
- */
- private boolean invalidating;
-
- /**
- * The lock to use to guard all state changes for the MessageClient.
- */
- protected Object lock = new Object();
-
- /**
- * Flag indicating whether the MessageClient is attempting to notify the remote client of
- * its invalidation.
- */
- private volatile boolean attemptingInvalidationClientNotification;
-
- /**
- * A counter used to control invalidation for a MessageClient that has multiple
- * subscriptions to its destination.
- * Unsubscribing from one will not invalidate the MessageClient as long as other
- * subscriptions remain active.
- */
- private transient int numReferences;
-
- /**
- * A set of all of the subscriptions managed by this message client.
- */
- protected final Set<SubscriptionInfo> subscriptions = new CopyOnWriteArraySet<SubscriptionInfo>();
-
- /**
- * Flag indicating whether this client is valid or not.
- */
- protected boolean valid;
-
- /**
- * Flag that indicates whether the MessageClient has a per-destination subscription timeout.
- * If false, the MessageClient will remain valid until its associated FlexSession is invalidated.
- */
- private volatile boolean willTimeout;
-
- /**
- * Has anyone explicitly registered this message client. This indicates that
- * there is a reference to this MessageClient which is not an explicit subscription.
- * This is a hook for FDMS and other adapters which want to use pushMessageToClients
- * with clientIds but that do not want the subscription manager to manage subscriptions
- * for them.
- */
- private volatile boolean registered = false;
-
- //--------------------------------------------------------------------------
- //
- // Public Methods
- //
- //--------------------------------------------------------------------------
-
- /**
- * Returns the clientId for the MessageClient.
- *
- * @return The clientId for the MessageClient.
- */
- public Object getClientId()
- {
- return clientId; // Field is final; no need to sync.
- }
-
- /**
- * Returns the destination the MessageClient is subscribed to.
- *
- * @return The destination the MessageClient is subscribed to.
- */
- public Destination getDestination()
- {
- return destination; // Field is final; no need to sync.
- }
-
- /**
- * Returns the id of the destination the MessageClient is subscribed to.
- *
- * @return The id of the destination the MessageClient is subscribed to.
- */
- public String getDestinationId()
- {
- return destinationId; // Field is final; no need to sync.
- }
-
- /**
- * Returns the Id for the endpoint the MessageClient subscription was created over.
- *
- * @return The Id for the endpoint the MessageClient subscription was created over.
- */
- public String getEndpointId()
- {
- return endpointId; // Field is final; no need to sync.
- }
-
- /**
- * Returns the FlexClient associated with this MessageClient.
- *
- * @return The FlexClient assocaited with this MessageClient.
- */
- public FlexClient getFlexClient()
- {
- return flexClient; // Field is final; no need to sync.
- }
-
- /**
- * Returns the FlexSession associated with this MessageClient.
- *
- * @return The FlexSession associated with this MessageClient.
- */
- public FlexSession getFlexSession()
- {
- synchronized (lock)
- {
- return flexSession;
- }
- }
-
- /**
- * Returns the number of subscriptions associated with this MessageClient.
- *
- * @return The number of subscriptions associated with this MessageClient.
- */
- public int getSubscriptionCount()
- {
- int count;
-
- synchronized (lock)
- {
- count = subscriptions != null? subscriptions.size() : 0;
- }
-
- return count;
- }
-
- /**
- * @exclude
- * This is used for FlexClient outbound queue management. When a MessageClient is invalidated
- * if it is attempting to notify the client, then we must leave the outbound queue containing
- * the notification in place. Otherwise, any messages queued for the subscription may be
- * removed from the queue and possibly shut down immediately.
- */
- public boolean isAttemptingInvalidationClientNotification()
- {
- return attemptingInvalidationClientNotification;
- }
-
- /**
- * @exclude
- * This is set to true when the MessageClient is invalidated due to the client
- * channel the subscription was established over disconnecting.
- * It allows the FlexClient class to cleanup the outbound queue for the channel's
- * corresponding server endpoint for the remote client, because we know that no
- * currently queued messages need to be retained for delivery.
- */
- public void setClientChannelDisconnected(boolean value)
- {
- clientChannelDisconnected = value;
- }
-
- /**
- * @exclude
- */
- public boolean isClientChannelDisconnected()
- {
- return clientChannelDisconnected;
- }
-
- /**
- * @exclude
- * This is true when some code other than the SubscriptionManager
- * is maintaining subscriptions for this message client. It ensures
- * that we have this MessageClient kept around until the session
- * expires.
- */
- public void setRegistered(boolean reg)
- {
- registered = reg;
- }
-
- /**
- * @exclude
- */
- public boolean isRegistered()
- {
- return registered;
- }
-
- /**
- * Adds a MessageClient destroy listener.
- *
- * @see flex.messaging.MessageClientListener
- *
- * @param listener The listener to add.
- */
- public void addMessageClientDestroyedListener(MessageClientListener listener)
- {
- if (listener != null)
- {
- checkValid();
-
- if (destroyedListeners == null)
- {
- synchronized (lock)
- {
- if (destroyedListeners == null)
- destroyedListeners = new CopyOnWriteArrayList();
- }
- }
-
- destroyedListeners.addIfAbsent(listener);
- }
- }
-
- /**
- * Removes a MessageClient destroyed listener.
- *
- * @see flex.messaging.MessageClientListener
- *
- * @param listener The listener to remove.
- */
- public void removeMessageClientDestroyedListener(MessageClientListener listener)
- {
- // No need to check validity; removing a listener is always ok.
- if (listener != null && destroyedListeners != null)
- destroyedListeners.remove(listener);
- }
-
- /**
- * @exclude
- * Adds a subscription to the subscription set for this MessageClient.
- *
- * @param selector The selector expression used for the subscription.
- * @param subtopic The subtopic used for the subscription.
- * @param maxFrequency The maximum number of messages the client wants to
- * receive per second (0 disables this limit).
- */
- public void addSubscription(String selector, String subtopic, int maxFrequency)
- {
- synchronized (lock)
- {
- checkValid();
-
- incrementReferences();
-
- // Create and add the subscription to the subscriptions set.
- SubscriptionInfo si = new SubscriptionInfo(selector, subtopic, maxFrequency);
- subscriptions.add(si);
-
- registerSubscriptionWithThrottleManager(si);
- }
- }
-
- /**
- * @exclude
- * Registers the subscription with the outbound queue processor's throttle
- * manager, if one exists.
- *
- * @param si The subscription info object.
- */
- public void registerSubscriptionWithThrottleManager(SubscriptionInfo si)
- {
- // Register the destination that will setup client level outbound throttling.
- ThrottleSettings ts = destination.getNetworkSettings().getThrottleSettings();
- if (ts.getOutboundPolicy() != Policy.NONE && (ts.isOutboundClientThrottleEnabled() || si.maxFrequency > 0))
- {
- // Setup the client level outbound throttling, and register the destination
- // only if the policy is not NONE, and a throttling limit is specified
- // either at the destination or by consumer.
- OutboundQueueThrottleManager throttleManager = getThrottleManager(true);
- if (throttleManager != null)
- throttleManager.registerDestination(destinationId, ts.getOutgoingClientFrequency(), ts.getOutboundPolicy());
- }
- else if (si.maxFrequency > 0) // Let the client know that maxFrequency will be ignored.
- {
- if (Log.isWarn())
- Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).warn("MessageClient with clientId '"
- + clientId + "' for destination '" + destinationId
- + "' specified a maxFrequency value of '" + si.maxFrequency
- + "' but the destination does not define a throttling policy. This value will be ignored.");
- }
-
- // Now, register the subscription.
- OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
- if (throttleManager != null)
- throttleManager.registerSubscription(destinationId, si);
- }
-
- /**
- * @exclude
- * Removes a subscription from the subscription set for this MessageClient.
- *
- * @param selector The selector expression for the subscription.
- * @param subtopic The subtopic for the subscription.
- * @return true if no subscriptions remain for this MessageClient; otherwise false.
- */
- public boolean removeSubscription(String selector, String subtopic)
- {
- synchronized (lock)
- {
- SubscriptionInfo si = new SubscriptionInfo(selector, subtopic);
- if (subscriptions.remove(si))
- {
- unregisterSubscriptionWithThrottleManager(si);
- return decrementReferences();
- }
- else if (Log.isError())
- {
- Log.getLogger(MessageService.LOG_CATEGORY).error("Error - unable to find subscription to remove for MessageClient: "
- + clientId + " selector: " + selector + " subtopic: " + subtopic);
- }
- return numReferences == 0;
- }
- }
-
- /**
- * @exclude
- * We use the same MessageClient for more than one subscription with different
- * selection criteria. This tracks the number of subscriptions that are active
- * so that we know when we are finished.
- */
- public void incrementReferences()
- {
- synchronized (lock)
- {
- numReferences++;
- }
- }
-
- /**
- * @exclude
- * Decrements the numReferences variable and returns true if this was the last reference.
- */
- public boolean decrementReferences()
- {
- synchronized (lock)
- {
- if (--numReferences == 0)
- {
- cancelTimeout();
- if (destination instanceof MessageDestination)
- {
- MessageDestination msgDestination = (MessageDestination)destination;
- if (msgDestination.getThrottleManager() != null)
- msgDestination.getThrottleManager().removeClientThrottleMark(clientId);
- }
- return true;
- }
- return false;
- }
- }
-
- /**
- * @exclude
- * Invoked by SubscriptionManager once the subscription state is setup completely
- * for the MessageClient..
- */
- public void notifyCreatedListeners()
- {
- // Notify MessageClient created listeners.
- if (!createdListeners.isEmpty())
- {
- // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
- for (Iterator iter = createdListeners.iterator(); iter.hasNext();)
- ((MessageClientListener)iter.next()).messageClientCreated(this);
- }
- }
-
- /**
- * @exclude
- * Invoked by SubscriptionManager while handling a subscribe request.
- * If the request is updating an existing subscription the 'push' state in the associated FlexClient
- * may need to be updated to ensure that the correct endpoint is used for this subscription.
- *
- * @param newEndpointId The id for the new endpoint that the subscription may have failed over to.
- */
- public void resetEndpoint(String newEndpointId)
- {
- String oldEndpointId = null;
- FlexSession oldSession = null;
- FlexSession newSession = FlexContext.getFlexSession();
- synchronized (lock)
- {
- // If anything is null, or nothing has changed, no need for a reset.
- if (endpointId == null || newEndpointId == null || flexSession == null || newSession == null || (endpointId.equals(newEndpointId) && flexSession.equals(newSession)))
- return;
-
- oldEndpointId = endpointId;
- endpointId = newEndpointId;
-
- oldSession = flexSession;
- flexSession = newSession;
- }
-
- // Unregister in order to reset the proper push settings in the re-registration below once the session association has been patched.
- if (flexClient != null)
- flexClient.unregisterMessageClient(this);
-
- // Clear out any reference to this subscription that the previously associated session has.
- if (oldSession != null)
- oldSession.unregisterMessageClient(this);
-
- // Associate the current session with this subscription.
- if (flexSession != null)
- flexSession.registerMessageClient(this);
-
- // Reset proper push settings.
- if (flexClient != null)
- flexClient.registerMessageClient(this);
-
- if (Log.isDebug())
- {
- String msg = "MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been reset as a result of a resubscribe.";
- if (oldEndpointId != null && !oldEndpointId.equals(newEndpointId))
- msg += " Endpoint change [" + oldEndpointId + " -> " + newEndpointId + "]";
- if ((oldSession != null) && (newSession != null) && (oldSession != newSession)) // Test identity.
- msg += " FlexSession change [" + oldSession.getClass().getName() + ":" + oldSession.getId() + " -> " + newSession.getClass().getName() + ":" + newSession.getId() + "]";
-
- Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug(msg);
- }
- }
-
- /**
- * @exclude
- * Used to test whether this client should receive this message
- * based on the list of subscriptions we have recorded for it.
- * It must match both the subtopic and the selector expression.
- * Usually this is done by the subscription manager - this logic is
- * only here to maintain api compatibility with one of the variants
- * of the pushMessageToClients which has subscriberIds and an evalSelector
- * property.
- *
- * @param message The message to test.
- */
- public boolean testMessage(Message message, MessageDestination destination)
- {
- String subtopic = (String) message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);
- String subtopicSeparator = destination.getServerSettings().getSubtopicSeparator();
- synchronized (lock)
- {
- for (SubscriptionInfo si : subscriptions)
- {
- if (si.matches(message, subtopic, subtopicSeparator))
- return true;
- }
- }
- return false;
- }
-
- /**
- * Returns true if the MessageClient is valid; false if it has been invalidated.
- *
- * @return true if the MessageClient is valid; otherwise false.
- */
- public boolean isValid()
- {
- synchronized (lock)
- {
- return valid;
- }
- }
-
- /**
- * Invalidates the MessageClient.
- */
- public void invalidate()
- {
- invalidate(false /* don't attempt to notify the client */);
- }
-
- /**
- * Invalidates the MessageClient, and optionally attempts to notify the client that
- * this subscription has been invalidated.
- * This overload is used when a subscription is timed out while the client is still
- * actively connected to the server but should also be used by any custom code on the server
- * that invalidates MessageClients but wishes to notify the client cleanly.
- *
- * @param notifyClient <code>true</code> to notify the client that its subscription has been
- * invalidated.
- */
- public void invalidate(boolean notifyClient)
- {
- synchronized (lock)
- {
- if (!valid || invalidating)
- return; // Already shutting down.
-
- invalidating = true; // This thread gets to shut the MessageClient down.
- cancelTimeout();
- }
-
- // Record whether we're attempting to notify the client or not.
- attemptingInvalidationClientNotification = notifyClient;
-
- // Build a subscription invalidation message and push to the client if it is still valid.
- if (notifyClient && flexClient != null && flexClient.isValid())
- {
- CommandMessage msg = new CommandMessage();
- msg.setDestination(destination.getId());
- msg.setClientId(clientId);
- msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION);
- Set subscriberIds = new TreeSet();
- subscriberIds.add(clientId);
- try
- {
- if (destination instanceof MessageDestination)
- {
- MessageDestination msgDestination = (MessageDestination)destination;
- ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, msg, false /* don't eval selector */);
- }
- }
- catch (MessageException ignore) {}
- }
-
- // Notify messageClientDestroyed listeners that we're being invalidated.
- if (destroyedListeners != null && !destroyedListeners.isEmpty())
- {
- for (Iterator iter = destroyedListeners.iterator(); iter.hasNext();)
- {
- ((MessageClientListener)iter.next()).messageClientDestroyed(this);
- }
- destroyedListeners.clear();
- }
-
- // And generate unsubscribe messages for all of the MessageClient's subscriptions and
- // route them to the destination this MessageClient is subscribed to.
- // The reason we send a message to the service rather than just going straight to the SubscriptionManager
- // is that some adapters manage their own subscription state (i.e. JMS) in addition to us keeping track of
- // things with our SubscriptionManager.
- ArrayList<CommandMessage> unsubMessages = new ArrayList<CommandMessage>();
- synchronized (lock)
- {
- for (SubscriptionInfo subInfo : subscriptions)
- {
- CommandMessage unsubMessage = new CommandMessage();
- unsubMessage.setDestination(destination.getId());
- unsubMessage.setClientId(clientId);
- unsubMessage.setOperation(CommandMessage.UNSUBSCRIBE_OPERATION);
- unsubMessage.setHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER, Boolean.TRUE);
- unsubMessage.setHeader(CommandMessage.SELECTOR_HEADER, subInfo.selector);
- unsubMessage.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subInfo.subtopic);
- unsubMessages.add(unsubMessage);
- }
- }
- // Release the lock and send the unsub messages.
- for (CommandMessage unsubMessage : unsubMessages)
- {
- try
- {
- destination.getService().serviceCommand(unsubMessage);
- }
- catch (MessageException me)
- {
- if (Log.isDebug())
- Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient: " + getClientId() + " issued an unsubscribe message during invalidation that was not processed but will continue with invalidation. Reason: " + ExceptionUtil.toString(me));
- }
- }
-
- synchronized (lock)
- {
- // If we didn't clean up all subscriptions log an error and continue with shutdown.
- int remainingSubscriptionCount = subscriptions.size();
- if (remainingSubscriptionCount > 0 && Log.isError())
- Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).error("MessageClient: " + getClientId() + " failed to remove " + remainingSubscriptionCount + " subscription(s) during invalidation");
- }
-
- // If someone registered this message client, invalidating it will free
- // their reference which will typically also remove this message client.
- if (registered && destination instanceof MessageDestination)
- ((MessageDestination)destination).getSubscriptionManager().releaseMessageClient(this);
-
- synchronized (lock)
- {
- valid = false;
- invalidating = false;
- }
-
- if (Log.isDebug())
- Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been invalidated.");
- }
-
- /**
- * Pushes the supplied message and then invalidates the MessageClient.
- *
- * @param message The message to push to the client before invalidating.
- * When message is null, MessageClient is invalidated silently.
- */
- public void invalidate(Message message)
- {
- if (message != null)
- {
- message.setDestination(destination.getId());
- message.setClientId(clientId);
-
- Set subscriberIds = new TreeSet();
- subscriberIds.add(clientId);
- try
- {
- if (destination instanceof MessageDestination)
- {
- MessageDestination msgDestination = (MessageDestination)destination;
- ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, message, false /* don't eval selector */);
- }
- }
- catch (MessageException ignore) {}
-
- invalidate(true /* attempt to notify remote client */);
- }
- else
- {
- invalidate();
- }
- }
-
- /**
- * @exclude
- * Compares this MessageClient to the specified object. The result is true if
- * the argument is not null and is a MessageClient instance with a matching
- * clientId value.
- *
- * @param o The object to compare this MessageClient to.
- * @return true if the MessageClient is equal; otherwise false.
- */
- public boolean equals(Object o)
- {
- if (o instanceof MessageClient)
- {
- MessageClient c = (MessageClient) o;
- if (c != null && c.getClientId().equals(clientId))
- return true;
- }
- return false;
- }
-
- /**
- * @exclude
- * Returns the hash code for this MessageClient. The returned value is
- * the hash code for the MessageClient's clientId property.
- *
- * @return The hash code value for this MessageClient.
- */
- @Override
- public int hashCode()
- {
- return getClientId().hashCode();
- }
-
- /**
- * @exclude
- * The String representation of this MessageClient is returned (its clientId value).
- *
- * @return The clientId value for this MessageClient.
- */
- @Override
- public String toString()
- {
- return String.valueOf(clientId);
- }
-
- //----------------------------------
- // TimeoutAbstractObject overrides
- //----------------------------------
-
- /**
- * @exclude
- * Implements TimeoutCapable.
- * This method returns the timeout value configured for the MessageClient's destination.
- */
- @Override
- public long getTimeoutPeriod()
- {
- return (destination instanceof MessageDestination) ?
- ((MessageDestination)destination).getSubscriptionManager().getSubscriptionTimeoutMillis() : 0;
- }
-
- /**
- * @exclude
- * Implements TimeoutCapable.
- * This method is invoked when the MessageClient has timed out and it
- * invalidates the MessageClient.
- */
- public void timeout()
- {
- invalidate(true /* notify client */);
- }
-
- /**
- * @exclude
- * Returns true if a timeout task is running for this MessageClient.
- */
- public boolean isTimingOut()
- {
- return willTimeout;
- }
-
- /**
- * @exclude
- * Records whether a timeout task is running for this MessageClient.
- */
- public void setTimingOut(boolean value)
- {
- willTimeout = value;
- }
-
- //--------------------------------------------------------------------------
- //
- // Private Methods
- //
- //--------------------------------------------------------------------------
-
- /**
- * Utility method that tests validity and throws an exception if the instance
- * has been invalidated.
- */
- private void checkValid()
- {
- synchronized (lock)
- {
- if (!valid)
- {
- throw new RuntimeException("MessageClient has been invalidated."); // TODO - localize
- }
- }
- }
-
- private OutboundQueueThrottleManager getThrottleManager(boolean create)
- {
- if (flexClient != null)
- {
- FlexClientOutboundQueueProcessor processor = flexClient.getOutboundQueueProcessor(endpointId);
- if (processor != null)
- return create? processor.getOrCreateOutboundQueueThrottleManager() : processor.getOutboundQueueThrottleManager();
- }
- return null;
- }
-
- private void unregisterSubscriptionWithThrottleManager(SubscriptionInfo si)
- {
- OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
- if (throttleManager != null)
- throttleManager.unregisterSubscription(destinationId, si);
- }
-
- //--------------------------------------------------------------------------
- //
- // Nested Classes
- //
- //--------------------------------------------------------------------------
-
- /**
- * Represents a MessageClient's subscription to a destination.
- * It captures the optional selector expression and subtopic for the
- * subscription.
- */
- public static class SubscriptionInfo implements Comparable
- {
- public String selector, subtopic;
- public int maxFrequency; // maxFrequency per subscription. Not used in BlazeDS.
-
- public SubscriptionInfo(String sel, String sub)
- {
- this(sel, sub, 0);
- }
-
- public SubscriptionInfo(String sel, String sub, int maxFrequency)
- {
- this.selector = sel;
- this.subtopic = sub;
- this.maxFrequency = maxFrequency;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (o instanceof SubscriptionInfo)
- {
- SubscriptionInfo other = (SubscriptionInfo) o;
- return equalStrings(other.selector, selector) &&
- equalStrings(other.subtopic, subtopic);
- }
- return false;
- }
-
- @Override
- public int hashCode()
- {
- return (selector == null ? 0 : selector.hashCode()) +
- (subtopic == null ? 1 : subtopic.hashCode());
- }
-
- /**
- * Compares the two subscription infos (being careful to
- * ensure we compare in a consistent way if the arguments
- * are switched).
- * @param o the object to compare
- * @return int the compare result
- */
- public int compareTo(Object o)
- {
- SubscriptionInfo other = (SubscriptionInfo) o;
- int result;
-
- if ((result = compareStrings(other.selector, selector)) != 0)
- return result;
- else if ((result = compareStrings(other.subtopic, subtopic)) != 0)
- return result;
-
- return 0;
- }
-
- /**
- * Check whether the message matches with selected subtopic.
- * @param message current message
- * @param subtopicToMatch subtopc string
- * @param subtopicSeparator suptopic separator
- * @return true if the message matches the subtopic
- */
- public boolean matches(Message message, String subtopicToMatch, String subtopicSeparator)
- {
- if ((subtopicToMatch == null && subtopic != null) || (subtopicToMatch != null && subtopic == null))
- return false; // If either defines a subtopic, they both must define one.
-
- // If both define a subtopic, they must match.
- if (subtopicToMatch != null && subtopic != null)
- {
- Subtopic consumerSubtopic = new Subtopic(subtopic, subtopicSeparator);
- Subtopic messageSubtopic = new Subtopic(subtopicToMatch, subtopicSeparator);
- if (!consumerSubtopic.matches(messageSubtopic))
- return false; // Not a match.
- }
-
- if (selector == null)
- return true;
-
- JMSSelector jmsSelector = new JMSSelector(selector);
- try
- {
- if (jmsSelector.match(message))
- return true;
- }
- catch (JMSSelectorException jmse)
- {
- // Log a warning for this client's selector and continue
- if (Log.isWarn())
- {
- Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error processing message selector: " +
- jmse.toString() + StringUtils.NEWLINE +
- " incomingMessage: " + message + StringUtils.NEWLINE +
- " selector: " + selector + StringUtils.NEWLINE);
- }
- }
- return false;
- }
-
- /**
- * Returns a String representation of the subscription info.
- * @return String the string representation of the subscription info
- */
- public String toString()
- {
- StringBuffer sb = new StringBuffer();
- sb.append("Subtopic: " + subtopic + StringUtils.NEWLINE);
- sb.append("Selector: " + selector + StringUtils.NEWLINE);
- if (maxFrequency > 0)
- sb.append("maxFrequency: " + maxFrequency);
- return sb.toString();
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import flex.messaging.client.FlexClient;
+import flex.messaging.client.FlexClientOutboundQueueProcessor;
+import flex.messaging.client.OutboundQueueThrottleManager;
+import flex.messaging.config.ThrottleSettings;
+import flex.messaging.config.ThrottleSettings.Policy;
+import flex.messaging.log.LogCategories;
+import flex.messaging.log.Log;
+import flex.messaging.messages.AsyncMessage;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.Message;
+import flex.messaging.services.MessageService;
+import flex.messaging.services.messaging.Subtopic;
+import flex.messaging.services.messaging.selector.JMSSelector;
+import flex.messaging.services.messaging.selector.JMSSelectorException;
+import flex.messaging.util.ExceptionUtil;
+import flex.messaging.util.TimeoutAbstractObject;
+import flex.messaging.util.StringUtils;
+
+/**
+ * Represents a client-side MessageAgent instance.
+ * Currently a server-side MessageClient is only created if its client-side counterpart has subscribed
+ * to a destination for pushed data (e.g. Consumer). Client-side Producers do not result in the creation of
+ * corresponding server-side MessageClient instances.
+ *
+ * Client-side MessageAgents communicate with the server over a Channel that corresponds to a FlexSession.
+ * Server-side MessageClient instances are always created in the context of a FlexSession and when the FlexSession
+ * is invalidated any associated MessageClients are invalidated as well.
+ *
+ * MessageClients may also be timed out on a per-destination basis and this is based on subscription inactivity.
+ * If no messages are pushed to the MessageClient within the destination's subscription timeout period the
+ * MessageClient will be shutdown even if the associated FlexSession is still active and connected.
+ * Per-destination subscription timeout is an optional configuration setting, and should only be used when inactive
+ * subscriptions should be shut down opportunistically to preserve server resources.
+ */
+public class MessageClient extends TimeoutAbstractObject implements Serializable
+{
+ //--------------------------------------------------------------------------
+ //
+ // Public Static Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Log category for MessageClient related messages.
+ */
+ public static final String MESSAGE_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_MESSAGECLIENT;
+
+ //--------------------------------------------------------------------------
+ //
+ // Static Constants
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Serializable to support broadcasting subscription state across the cluster for
+ * optimized message routing.
+ */
+ static final long serialVersionUID = 3730240451524954453L;
+
+ //--------------------------------------------------------------------------
+ //
+ // Static Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * The list of MessageClient created listeners.
+ */
+ private static final CopyOnWriteArrayList<MessageClientListener> createdListeners = new CopyOnWriteArrayList<MessageClientListener>();
+
+ //--------------------------------------------------------------------------
+ //
+ // Static Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Adds a MessageClient created listener.
+ *
+ * @see flex.messaging.MessageClientListener
+ *
+ * @param listener The listener to add.
+ */
+ public static void addMessageClientCreatedListener(MessageClientListener listener)
+ {
+ if (listener != null)
+ createdListeners.addIfAbsent(listener);
+ }
+
+ /**
+ * Removes a MessageClient created listener.
+ *
+ * @see flex.messaging.MessageClientListener
+ *
+ * @param listener The listener to remove.
+ */
+ public static void removeMessageClientCreatedListener(MessageClientListener listener)
+ {
+ if (listener != null)
+ createdListeners.remove(listener);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Private Static Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Utility method.
+ */
+ private static boolean equalStrings(String a, String b)
+ {
+ return a == b || (a != null && a.equals(b));
+ }
+
+ /**
+ * Utility method.
+ */
+ static int compareStrings(String a, String b)
+ {
+ if (a == b)
+ return 0;
+
+ if (a != null && b != null)
+ return a.compareTo(b);
+
+ if (a == null)
+ return -1;
+
+ return 1;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * @exclude
+ * Constructs a new MessageClient for local use.
+ *
+ * @param clientId The clientId for the MessageClient.
+ * @param destination The destination the MessageClient is subscribed to.
+ * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
+ */
+ public MessageClient(Object clientId, Destination destination, String endpointId)
+ {
+ this(clientId, destination, endpointId, true);
+ }
+
+ /**
+ * @exclude
+ * Constructs a new MessageClient.
+ *
+ * @param clientId The clientId for the MessageClient.
+ * @param destination The destination the MessageClient is subscribed to.
+ * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
+ * @param useSession RemoteMessageClient instances should not be associated with a FlexSession (pass false).
+ */
+ public MessageClient(Object clientId, Destination destination, String endpointId, boolean useSession)
+ {
+ valid = true;
+ this.clientId = clientId;
+ this.destination = destination;
+ this.endpointId = endpointId;
+ destinationId = destination.getId();
+ updateLastUse(); // Initialize last use timestamp to construct time.
+
+ /* If this is for a remote server, we do not associate with the session. */
+ if (useSession)
+ {
+ flexSession = FlexContext.getFlexSession();
+ flexSession.registerMessageClient(this);
+
+ flexClient = FlexContext.getFlexClient();
+ flexClient.registerMessageClient(this);
+
+ // SubscriptionManager will notify the created listeners, once
+ // subscription state is setup completely.
+ // notifyCreatedListeners();
+ }
+ else
+ {
+ flexClient = null;
+ flexSession = null;
+ // Use an instance level lock.
+ lock = new Object();
+ // On a remote server we don't notify created listeners.
+ }
+
+ if (Log.isDebug())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient created with clientId '" + this.clientId + "' for destination '" + destinationId + "'.");
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * This flag is set to true when the client channel that this subscription was
+ * established over is disconnected.
+ * It supports cleaning up per-endpoint outbound queues maintained by the FlexClient.
+ * If the client notifies the server that its channel is disconnecting, the FlexClient
+ * does not need to maintain an outbound queue containing a subscription invalidation
+ * message for this MessageClient to send to the client.
+ */
+ private volatile boolean clientChannelDisconnected;
+
+ /**
+ * The clientId for the MessageClient.
+ * This value is specified by the client directly or is autogenerated on the client.
+ */
+ protected final Object clientId;
+
+ /**
+ * Internal reference to the associated Destination; don't expose this in the public API.
+ */
+ protected final Destination destination;
+
+ /**
+ * The destination the MessageClient is subscribed to.
+ */
+ protected final String destinationId;
+
+ /**
+ * The set of session destroy listeners to notify when the session is destroyed.
+ */
+ private transient volatile CopyOnWriteArrayList destroyedListeners;
+
+ /**
+ * The Id for the endpoint this MessageClient subscription was created over.
+ */
+ private String endpointId;
+
+ /**
+ * The FlexClient associated with the MessageClient.
+ */
+ private final transient FlexClient flexClient;
+
+ /**
+ * The FlexSession associated with the MessageClient.
+ * Not final because this needs to be reset if the subscription fails over to a new endpoint.
+ */
+ private transient FlexSession flexSession;
+
+ /**
+ * Flag used to break cycles during invalidation.
+ */
+ private boolean invalidating;
+
+ /**
+ * The lock to use to guard all state changes for the MessageClient.
+ */
+ protected Object lock = new Object();
+
+ /**
+ * Flag indicating whether the MessageClient is attempting to notify the remote client of
+ * its invalidation.
+ */
+ private volatile boolean attemptingInvalidationClientNotification;
+
+ /**
+ * A counter used to control invalidation for a MessageClient that has multiple
+ * subscriptions to its destination.
+ * Unsubscribing from one will not invalidate the MessageClient as long as other
+ * subscriptions remain active.
+ */
+ private transient int numReferences;
+
+ /**
+ * A set of all of the subscriptions managed by this message client.
+ */
+ protected final Set<SubscriptionInfo> subscriptions = new CopyOnWriteArraySet<SubscriptionInfo>();
+
+ /**
+ * Flag indicating whether this client is valid or not.
+ */
+ protected boolean valid;
+
+ /**
+ * Flag that indicates whether the MessageClient has a per-destination subscription timeout.
+ * If false, the MessageClient will remain valid until its associated FlexSession is invalidated.
+ */
+ private volatile boolean willTimeout;
+
+ /**
+ * Has anyone explicitly registered this message client. This indicates that
+ * there is a reference to this MessageClient which is not an explicit subscription.
+ * This is a hook for FDMS and other adapters which want to use pushMessageToClients
+ * with clientIds but that do not want the subscription manager to manage subscriptions
+ * for them.
+ */
+ private volatile boolean registered = false;
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Returns the clientId for the MessageClient.
+ *
+ * @return The clientId for the MessageClient.
+ */
+ public Object getClientId()
+ {
+ return clientId; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the destination the MessageClient is subscribed to.
+ *
+ * @return The destination the MessageClient is subscribed to.
+ */
+ public Destination getDestination()
+ {
+ return destination; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the id of the destination the MessageClient is subscribed to.
+ *
+ * @return The id of the destination the MessageClient is subscribed to.
+ */
+ public String getDestinationId()
+ {
+ return destinationId; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the Id for the endpoint the MessageClient subscription was created over.
+ *
+ * @return The Id for the endpoint the MessageClient subscription was created over.
+ */
+ public String getEndpointId()
+ {
+ return endpointId; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the FlexClient associated with this MessageClient.
+ *
+ * @return The FlexClient assocaited with this MessageClient.
+ */
+ public FlexClient getFlexClient()
+ {
+ return flexClient; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the FlexSession associated with this MessageClient.
+ *
+ * @return The FlexSession associated with this MessageClient.
+ */
+ public FlexSession getFlexSession()
+ {
+ synchronized (lock)
+ {
+ return flexSession;
+ }
+ }
+
+ /**
+ * Returns the number of subscriptions associated with this MessageClient.
+ *
+ * @return The number of subscriptions associated with this MessageClient.
+ */
+ public int getSubscriptionCount()
+ {
+ int count;
+
+ synchronized (lock)
+ {
+ count = subscriptions != null? subscriptions.size() : 0;
+ }
+
+ return count;
+ }
+
+ /**
+ * @exclude
+ * This is used for FlexClient outbound queue management. When a MessageClient is invalidated
+ * if it is attempting to notify the client, then we must leave the outbound queue containing
+ * the notification in place. Otherwise, any messages queued for the subscription may be
+ * removed from the queue and possibly shut down immediately.
+ */
+ public boolean isAttemptingInvalidationClientNotification()
+ {
+ return attemptingInvalidationClientNotification;
+ }
+
+ /**
+ * @exclude
+ * This is set to true when the MessageClient is invalidated due to the client
+ * channel the subscription was established over disconnecting.
+ * It allows the FlexClient class to cleanup the outbound queue for the channel's
+ * corresponding server endpoint for the remote client, because we know that no
+ * currently queued messages need to be retained for delivery.
+ */
+ public void setClientChannelDisconnected(boolean value)
+ {
+ clientChannelDisconnected = value;
+ }
+
+ /**
+ * @exclude
+ */
+ public boolean isClientChannelDisconnected()
+ {
+ return clientChannelDisconnected;
+ }
+
+ /**
+ * @exclude
+ * This is true when some code other than the SubscriptionManager
+ * is maintaining subscriptions for this message client. It ensures
+ * that we have this MessageClient kept around until the session
+ * expires.
+ */
+ public void setRegistered(boolean reg)
+ {
+ registered = reg;
+ }
+
+ /**
+ * @exclude
+ */
+ public boolean isRegistered()
+ {
+ return registered;
+ }
+
+ /**
+ * Adds a MessageClient destroy listener.
+ *
+ * @see flex.messaging.MessageClientListener
+ *
+ * @param listener The listener to add.
+ */
+ public void addMessageClientDestroyedListener(MessageClientListener listener)
+ {
+ if (listener != null)
+ {
+ checkValid();
+
+ if (destroyedListeners == null)
+ {
+ synchronized (lock)
+ {
+ if (destroyedListeners == null)
+ destroyedListeners = new CopyOnWriteArrayList();
+ }
+ }
+
+ destroyedListeners.addIfAbsent(listener);
+ }
+ }
+
+ /**
+ * Removes a MessageClient destroyed listener.
+ *
+ * @see flex.messaging.MessageClientListener
+ *
+ * @param listener The listener to remove.
+ */
+ public void removeMessageClientDestroyedListener(MessageClientListener listener)
+ {
+ // No need to check validity; removing a listener is always ok.
+ if (listener != null && destroyedListeners != null)
+ destroyedListeners.remove(listener);
+ }
+
+ /**
+ * @exclude
+ * Adds a subscription to the subscription set for this MessageClient.
+ *
+ * @param selector The selector expression used for the subscription.
+ * @param subtopic The subtopic used for the subscription.
+ * @param maxFrequency The maximum number of messages the client wants to
+ * receive per second (0 disables this limit).
+ */
+ public void addSubscription(String selector, String subtopic, int maxFrequency)
+ {
+ synchronized (lock)
+ {
+ checkValid();
+
+ incrementReferences();
+
+ // Create and add the subscription to the subscriptions set.
+ SubscriptionInfo si = new SubscriptionInfo(selector, subtopic, maxFrequency);
+ subscriptions.add(si);
+
+ registerSubscriptionWithThrottleManager(si);
+ }
+ }
+
+ /**
+ * @exclude
+ * Registers the subscription with the outbound queue processor's throttle
+ * manager, if one exists.
+ *
+ * @param si The subscription info object.
+ */
+ public void registerSubscriptionWithThrottleManager(SubscriptionInfo si)
+ {
+ // Register the destination that will setup client level outbound throttling.
+ ThrottleSettings ts = destination.getNetworkSettings().getThrottleSettings();
+ if (ts.getOutboundPolicy() != Policy.NONE && (ts.isOutboundClientThrottleEnabled() || si.maxFrequency > 0))
+ {
+ // Setup the client level outbound throttling, and register the destination
+ // only if the policy is not NONE, and a throttling limit is specified
+ // either at the destination or by consumer.
+ OutboundQueueThrottleManager throttleManager = getThrottleManager(true);
+ if (throttleManager != null)
+ throttleManager.registerDestination(destinationId, ts.getOutgoingClientFrequency(), ts.getOutboundPolicy());
+ }
+ else if (si.maxFrequency > 0) // Let the client know that maxFrequency will be ignored.
+ {
+ if (Log.isWarn())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).warn("MessageClient with clientId '"
+ + clientId + "' for destination '" + destinationId
+ + "' specified a maxFrequency value of '" + si.maxFrequency
+ + "' but the destination does not define a throttling policy. This value will be ignored.");
+ }
+
+ // Now, register the subscription.
+ OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
+ if (throttleManager != null)
+ throttleManager.registerSubscription(destinationId, si);
+ }
+
+ /**
+ * @exclude
+ * Removes a subscription from the subscription set for this MessageClient.
+ *
+ * @param selector The selector expression for the subscription.
+ * @param subtopic The subtopic for the subscription.
+ * @return true if no subscriptions remain for this MessageClient; otherwise false.
+ */
+ public boolean removeSubscription(String selector, String subtopic)
+ {
+ synchronized (lock)
+ {
+ SubscriptionInfo si = new SubscriptionInfo(selector, subtopic);
+ if (subscriptions.remove(si))
+ {
+ unregisterSubscriptionWithThrottleManager(si);
+ return decrementReferences();
+ }
+ else if (Log.isError())
+ {
+ Log.getLogger(MessageService.LOG_CATEGORY).error("Error - unable to find subscription to remove for MessageClient: "
+ + clientId + " selector: " + selector + " subtopic: " + subtopic);
+ }
+ return numReferences == 0;
+ }
+ }
+
+ /**
+ * @exclude
+ * We use the same MessageClient for more than one subscription with different
+ * selection criteria. This tracks the number of subscriptions that are active
+ * so that we know when we are finished.
+ */
+ public void incrementReferences()
+ {
+ synchronized (lock)
+ {
+ numReferences++;
+ }
+ }
+
+ /**
+ * @exclude
+ * Decrements the numReferences variable and returns true if this was the last reference.
+ */
+ public boolean decrementReferences()
+ {
+ synchronized (lock)
+ {
+ if (--numReferences == 0)
+ {
+ cancelTimeout();
+ if (destination instanceof MessageDestination)
+ {
+ MessageDestination msgDestination = (MessageDestination)destination;
+ if (msgDestination.getThrottleManager() != null)
+ msgDestination.getThrottleManager().removeClientThrottleMark(clientId);
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * @exclude
+ * Invoked by SubscriptionManager once the subscription state is setup completely
+ * for the MessageClient..
+ */
+ public void notifyCreatedListeners()
+ {
+ // Notify MessageClient created listeners.
+ if (!createdListeners.isEmpty())
+ {
+ // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
+ for (Iterator iter = createdListeners.iterator(); iter.hasNext();)
+ ((MessageClientListener)iter.next()).messageClientCreated(this);
+ }
+ }
+
+ /**
+ * @exclude
+ * Invoked by SubscriptionManager while handling a subscribe request.
+ * If the request is updating an existing subscription the 'push' state in the associated FlexClient
+ * may need to be updated to ensure that the correct endpoint is used for this subscription.
+ *
+ * @param newEndpointId The id for the new endpoint that the subscription may have failed over to.
+ */
+ public void resetEndpoint(String newEndpointId)
+ {
+ String oldEndpointId = null;
+ FlexSession oldSession = null;
+ FlexSession newSession = FlexContext.getFlexSession();
+ synchronized (lock)
+ {
+ // If anything is null, or nothing has changed, no need for a reset.
+ if (endpointId == null || newEndpointId == null || flexSession == null || newSession == null || (endpointId.equals(newEndpointId) && flexSession.equals(newSession)))
+ return;
+
+ oldEndpointId = endpointId;
+ endpointId = newEndpointId;
+
+ oldSession = flexSession;
+ flexSession = newSession;
+ }
+
+ // Unregister in order to reset the proper push settings in the re-registration below once the session association has been patched.
+ if (flexClient != null)
+ flexClient.unregisterMessageClient(this);
+
+ // Clear out any reference to this subscription that the previously associated session has.
+ if (oldSession != null)
+ oldSession.unregisterMessageClient(this);
+
+ // Associate the current session with this subscription.
+ if (flexSession != null)
+ flexSession.registerMessageClient(this);
+
+ // Reset proper push settings.
+ if (flexClient != null)
+ flexClient.registerMessageClient(this);
+
+ if (Log.isDebug())
+ {
+ String msg = "MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been reset as a result of a resubscribe.";
+ if (oldEndpointId != null && !oldEndpointId.equals(newEndpointId))
+ msg += " Endpoint change [" + oldEndpointId + " -> " + newEndpointId + "]";
+ if ((oldSession != null) && (newSession != null) && (oldSession != newSession)) // Test identity.
+ msg += " FlexSession change [" + oldSession.getClass().getName() + ":" + oldSession.getId() + " -> " + newSession.getClass().getName() + ":" + newSession.getId() + "]";
+
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug(msg);
+ }
+ }
+
+ /**
+ * @exclude
+ * Used to test whether this client should receive this message
+ * based on the list of subscriptions we have recorded for it.
+ * It must match both the subtopic and the selector expression.
+ * Usually this is done by the subscription manager - this logic is
+ * only here to maintain api compatibility with one of the variants
+ * of the pushMessageToClients which has subscriberIds and an evalSelector
+ * property.
+ *
+ * @param message The message to test.
+ */
+ public boolean testMessage(Message message, MessageDestination destination)
+ {
+ String subtopic = (String) message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);
+ String subtopicSeparator = destination.getServerSettings().getSubtopicSeparator();
+ synchronized (lock)
+ {
+ for (SubscriptionInfo si : subscriptions)
+ {
+ if (si.matches(message, subtopic, subtopicSeparator))
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns true if the MessageClient is valid; false if it has been invalidated.
+ *
+ * @return true if the MessageClient is valid; otherwise false.
+ */
+ public boolean isValid()
+ {
+ synchronized (lock)
+ {
+ return valid;
+ }
+ }
+
+ /**
+ * Invalidates the MessageClient.
+ */
+ public void invalidate()
+ {
+ invalidate(false /* don't attempt to notify the client */);
+ }
+
+ /**
+ * Invalidates the MessageClient, and optionally attempts to notify the client that
+ * this subscription has been invalidated.
+ * This overload is used when a subscription is timed out while the client is still
+ * actively connected to the server but should also be used by any custom code on the server
+ * that invalidates MessageClients but wishes to notify the client cleanly.
+ *
+ * @param notifyClient <code>true</code> to notify the client that its subscription has been
+ * invalidated.
+ */
+ public void invalidate(boolean notifyClient)
+ {
+ synchronized (lock)
+ {
+ if (!valid || invalidating)
+ return; // Already shutting down.
+
+ invalidating = true; // This thread gets to shut the MessageClient down.
+ cancelTimeout();
+ }
+
+ // Record whether we're attempting to notify the client or not.
+ attemptingInvalidationClientNotification = notifyClient;
+
+ // Build a subscription invalidation message and push to the client if it is still valid.
+ if (notifyClient && flexClient != null && flexClient.isValid())
+ {
+ CommandMessage msg = new CommandMessage();
+ msg.setDestination(destination.getId());
+ msg.setClientId(clientId);
+ msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION);
+ Set subscriberIds = new TreeSet();
+ subscriberIds.add(clientId);
+ try
+ {
+ if (destination instanceof MessageDestination)
+ {
+ MessageDestination msgDestination = (MessageDestination)destination;
+ ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, msg, false /* don't eval selector */);
+ }
+ }
+ catch (MessageException ignore) {}
+ }
+
+ // Notify messageClientDestroyed listeners that we're being invalidated.
+ if (destroyedListeners != null && !destroyedListeners.isEmpty())
+ {
+ for (Iterator iter = destroyedListeners.iterator(); iter.hasNext();)
+ {
+ ((MessageClientListener)iter.next()).messageClientDestroyed(this);
+ }
+ destroyedListeners.clear();
+ }
+
+ // And generate unsubscribe messages for all of the MessageClient's subscriptions and
+ // route them to the destination this MessageClient is subscribed to.
+ // The reason we send a message to the service rather than just going straight to the SubscriptionManager
+ // is that some adapters manage their own subscription state (i.e. JMS) in addition to us keeping track of
+ // things with our SubscriptionManager.
+ ArrayList<CommandMessage> unsubMessages = new ArrayList<CommandMessage>();
+ synchronized (lock)
+ {
+ for (SubscriptionInfo subInfo : subscriptions)
+ {
+ CommandMessage unsubMessage = new CommandMessage();
+ unsubMessage.setDestination(destination.getId());
+ unsubMessage.setClientId(clientId);
+ unsubMessage.setOperation(CommandMessage.UNSUBSCRIBE_OPERATION);
+ unsubMessage.setHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER, Boolean.TRUE);
+ unsubMessage.setHeader(CommandMessage.SELECTOR_HEADER, subInfo.selector);
+ unsubMessage.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subInfo.subtopic);
+ unsubMessages.add(unsubMessage);
+ }
+ }
+ // Release the lock and send the unsub messages.
+ for (CommandMessage unsubMessage : unsubMessages)
+ {
+ try
+ {
+ destination.getService().serviceCommand(unsubMessage);
+ }
+ catch (MessageException me)
+ {
+ if (Log.isDebug())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient: " + getClientId() + " issued an unsubscribe message during invalidation that was not processed but will continue with invalidation. Reason: " + ExceptionUtil.toString(me));
+ }
+ }
+
+ synchronized (lock)
+ {
+ // If we didn't clean up all subscriptions log an error and continue with shutdown.
+ int remainingSubscriptionCount = subscriptions.size();
+ if (remainingSubscriptionCount > 0 && Log.isError())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).error("MessageClient: " + getClientId() + " failed to remove " + remainingSubscriptionCount + " subscription(s) during invalidation");
+ }
+
+ // If someone registered this message client, invalidating it will free
+ // their reference which will typically also remove this message client.
+ if (registered && destination instanceof MessageDestination)
+ ((MessageDestination)destination).getSubscriptionManager().releaseMessageClient(this);
+
+ synchronized (lock)
+ {
+ valid = false;
+ invalidating = false;
+ }
+
+ if (Log.isDebug())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been invalidated.");
+ }
+
+ /**
+ * Pushes the supplied message and then invalidates the MessageClient.
+ *
+ * @param message The message to push to the client before invalidating.
+ * When message is null, MessageClient is invalidated silently.
+ */
+ public void invalidate(Message message)
+ {
+ if (message != null)
+ {
+ message.setDestination(destination.getId());
+ message.setClientId(clientId);
+
+ Set subscriberIds = new TreeSet();
+ subscriberIds.add(clientId);
+ try
+ {
+ if (destination instanceof MessageDestination)
+ {
+ MessageDestination msgDestination = (MessageDestination)destination;
+ ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, message, false /* don't eval selector */);
+ }
+ }
+ catch (MessageException ignore) {}
+
+ invalidate(true /* attempt to notify remote client */);
+ }
+ else
+ {
+ invalidate();
+ }
+ }
+
+ /**
+ * @exclude
+ * Compares this MessageClient to the specified object. The result is true if
+ * the argument is not null and is a MessageClient instance with a matching
+ * clientId value.
+ *
+ * @param o The object to compare this MessageClient to.
+ * @return true if the MessageClient is equal; otherwise false.
+ */
+ public boolean equals(Object o)
+ {
+ if (o instanceof MessageClient)
+ {
+ MessageClient c = (MessageClient) o;
+ if (c != null && c.getClientId().equals(clientId))
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @exclude
+ * Returns the hash code for this MessageClient. The returned value is
+ * the hash code for the MessageClient's clientId property.
+ *
+ * @return The hash code value for this MessageClient.
+ */
+ @Override
+ public int hashCode()
+ {
+ return getClientId().hashCode();
+ }
+
+ /**
+ * @exclude
+ * The String representation of this MessageClient is returned (its clientId value).
+ *
+ * @return The clientId value for this MessageClient.
+ */
+ @Override
+ public String toString()
+ {
+ return String.valueOf(clientId);
+ }
+
+ //----------------------------------
+ // TimeoutAbstractObject overrides
+ //----------------------------------
+
+ /**
+ * @exclude
+ * Implements TimeoutCapable.
+ * This method returns the timeout value configured for the MessageClient's destination.
+ */
+ @Override
+ public long getTimeoutPeriod()
+ {
+ return (destination instanceof MessageDestination) ?
+ ((MessageDestination)destination).getSubscriptionManager().getSubscriptionTimeoutMillis() : 0;
+ }
+
+ /**
+ * @exclude
+ * Implements TimeoutCapable.
+ * This method is invoked when the MessageClient has timed out and it
+ * invalidates the MessageClient.
+ */
+ public void timeout()
+ {
+ invalidate(true /* notify client */);
+ }
+
+ /**
+ * @exclude
+ * Returns true if a timeout task is running for this MessageClient.
+ */
+ public boolean isTimingOut()
+ {
+ return willTimeout;
+ }
+
+ /**
+ * @exclude
+ * Records whether a timeout task is running for this MessageClient.
+ */
+ public void setTimingOut(boolean value)
+ {
+ willTimeout = value;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Private Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Utility method that tests validity and throws an exception if the instance
+ * has been invalidated.
+ */
+ private void checkValid()
+ {
+ synchronized (lock)
+ {
+ if (!valid)
+ {
+ throw new RuntimeException("MessageClient has been invalidated."); // TODO - localize
+ }
+ }
+ }
+
+ private OutboundQueueThrottleManager getThrottleManager(boolean create)
+ {
+ if (flexClient != null)
+ {
+ FlexClientOutboundQueueProcessor processor = flexClient.getOutboundQueueProcessor(endpointId);
+ if (processor != null)
+ return create? processor.getOrCreateOutboundQueueThrottleManager() : processor.getOutboundQueueThrottleManager();
+ }
+ return null;
+ }
+
+ private void unregisterSubscriptionWithThrottleManager(SubscriptionInfo si)
+ {
+ OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
+ if (throttleManager != null)
+ throttleManager.unregisterSubscription(destinationId, si);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Nested Classes
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Represents a MessageClient's subscription to a destination.
+ * It captures the optional selector expression and subtopic for the
+ * subscription.
+ */
+ public static class SubscriptionInfo implements Comparable
+ {
+ public String selector, subtopic;
+ public int maxFrequency; // maxFrequency per subscription. Not used in BlazeDS.
+
+ public SubscriptionInfo(String sel, String sub)
+ {
+ this(sel, sub, 0);
+ }
+
+ public SubscriptionInfo(String sel, String sub, int maxFrequency)
+ {
+ this.selector = sel;
+ this.subtopic = sub;
+ this.maxFrequency = maxFrequency;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o instanceof SubscriptionInfo)
+ {
+ SubscriptionInfo other = (SubscriptionInfo) o;
+ return equalStrings(other.selector, selector) &&
+ equalStrings(other.subtopic, subtopic);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return (selector == null ? 0 : selector.hashCode()) +
+ (subtopic == null ? 1 : subtopic.hashCode());
+ }
+
+ /**
+ * Compares the two subscription infos (being careful to
+ * ensure we compare in a consistent way if the arguments
+ * are switched).
+ * @param o the object to compare
+ * @return int the compare result
+ */
+ public int compareTo(Object o)
+ {
+ SubscriptionInfo other = (SubscriptionInfo) o;
+ int result;
+
+ if ((result = compareStrings(other.selector, selector)) != 0)
+ return result;
+ else if ((result = compareStrings(other.subtopic, subtopic)) != 0)
+ return result;
+
+ return 0;
+ }
+
+ /**
+ * Check whether the message matches with selected subtopic.
+ * @param message current message
+ * @param subtopicToMatch subtopc string
+ * @param subtopicSeparator suptopic separator
+ * @return true if the message matches the subtopic
+ */
+ public boolean matches(Message message, String subtopicToMatch, String subtopicSeparator)
+ {
+ if ((subtopicToMatch == null && subtopic != null) || (subtopicToMatch != null && subtopic == null))
+ return false; // If either defines a subtopic, they both must define one.
+
+ // If both define a subtopic, they must match.
+ if (subtopicToMatch != null && subtopic != null)
+ {
+ Subtopic consumerSubtopic = new Subtopic(subtopic, subtopicSeparator);
+ Subtopic messageSubtopic = new Subtopic(subtopicToMatch, subtopicSeparator);
+ if (!consumerSubtopic.matches(messageSubtopic))
+ return false; // Not a match.
+ }
+
+ if (selector == null)
+ return true;
+
+ JMSSelector jmsSelector = new JMSSelector(selector);
+ try
+ {
+ if (jmsSelector.match(message))
+ return true;
+ }
+ catch (JMSSelectorException jmse)
+ {
+ // Log a warning for this client's selector and continue
+ if (Log.isWarn())
+ {
+ Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error processing message selector: " +
+ jmse.toString() + StringUtils.NEWLINE +
+ " incomingMessage: " + message + StringUtils.NEWLINE +
+ " selector: " + selector + StringUtils.NEWLINE);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns a String representation of the subscription info.
+ * @return String the string representation of the subscription info
+ */
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Subtopic: " + subtopic + StringUtils.NEWLINE);
+ sb.append("Selector: " + selector + StringUtils.NEWLINE);
+ if (maxFrequency > 0)
+ sb.append("maxFrequency: " + maxFrequency);
+ return sb.toString();
+ }
+ }
+}