You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by cd...@apache.org on 2015/12/20 14:13:48 UTC
[08/51] [partial] flex-blazeds git commit: Removed legacy directories
and made the content of the modules directory the new root - Please use the
maven build for now as the Ant build will no longer work untill it is
adjusted to the new directory structur
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClient.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/FlexClient.java b/core/src/flex/messaging/client/FlexClient.java
new file mode 100644
index 0000000..e212986
--- /dev/null
+++ b/core/src/flex/messaging/client/FlexClient.java
@@ -0,0 +1,2240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import flex.messaging.ConnectionAwareSession;
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.FlexSessionListener;
+import flex.messaging.HttpFlexSession;
+import flex.messaging.MessageClient;
+import flex.messaging.MessageClientListener;
+import flex.messaging.MessageException;
+import flex.messaging.endpoints.Endpoint;
+import flex.messaging.log.Log;
+import flex.messaging.log.LogCategories;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.Message;
+import flex.messaging.services.MessageService;
+import flex.messaging.util.StringUtils;
+import flex.messaging.util.TimeoutAbstractObject;
+
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Represents a Flex client application instance on the server.
+ */
+public class FlexClient extends TimeoutAbstractObject implements FlexSessionListener, MessageClientListener
+{
+ //--------------------------------------------------------------------------
+ //
+ // Public Static Constants
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Log category for FlexClient related messages.
+ */
+ public static final String FLEX_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_FLEXCLIENT;
+
+ /**
+ * This value is passed to the server in an initial client connect to
+ * indicate that the client needs a server-assigned FlexClient Id.
+ *
+ */
+ public static final String NULL_FLEXCLIENT_ID = "nil";
+
+ //--------------------------------------------------------------------------
+ //
+ // Private Static Constants
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Error string constants.
+ */
+ private static final int FLEX_CLIENT_INVALIDATED = 10027;
+ private static final int ENDPOINT_PUSH_HANDLER_ALREADY_REGISTERED = 10033;
+
+ private static final String POLL_WAIT_THREAD_NAME_EXTENSION = "-in-poll-wait";
+
+ //--------------------------------------------------------------------------
+ //
+ // Private Static Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * List of registered FlexClient created listeners.
+ */
+ private static final CopyOnWriteArrayList<FlexClientListener> createdListeners = new CopyOnWriteArrayList<FlexClientListener>();
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Static Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Adds a create listener that will be notified when new FlexClients
+ * are created.
+ *
+ * @see flex.messaging.client.FlexClientListener
+ *
+ * @param listener The listener to add.
+ */
+ public static void addClientCreatedListener(FlexClientListener listener)
+ {
+ if (listener != null)
+ createdListeners.addIfAbsent(listener);
+ }
+
+ /**
+ * Removes a FlexClient created listener.
+ *
+ * @see flex.messaging.client.FlexClientListener
+ *
+ * @param listener The listener to remove.
+ */
+ public static void removeClientCreatedListener(FlexClientListener listener)
+ {
+ if (listener != null)
+ createdListeners.remove(listener);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ *
+ * Constructs a new FlexClient instance.
+ *
+ * @param manager The FlexClientManager managing this instance.
+ */
+ public FlexClient(FlexClientManager manager)
+ {
+ this(manager, FlexContext.getMessageBroker().createUUID());
+ }
+
+ /**
+ *
+ * Constructs a new FlexClient instance having the specified Id.
+ *
+ * @param manager The FlexClientManager managing this instance.
+ * @param id The Id for this instance.
+ */
+ public FlexClient(FlexClientManager manager, String id)
+ {
+ this.id = id;
+ flexClientManager = manager;
+ updateLastUse();
+ valid = true;
+
+ if (Log.isDebug())
+ Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("FlexClient created with id '" + this.id + "'.");
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Storage for custom attributes.
+ */
+ private volatile Map<String, Object> attributes;
+
+ /**
+ * List of registered FlexClient attribute listeners.
+ */
+ private volatile CopyOnWriteArrayList<FlexClientAttributeListener> attributeListeners;
+
+ /**
+ * List of registered FlexClient destroyed listeners.
+ */
+ private volatile CopyOnWriteArrayList<FlexClientListener> destroyedListeners;
+
+ /**
+ * The manager for the FlexClient.
+ */
+ final FlexClientManager flexClientManager;
+
+ /**
+ * The unique Id for the instance.
+ */
+ private final String id;
+
+ /**
+ * Flag used to break cycles during invalidation.
+ */
+ /* package visibility for FlexClientManager */ volatile boolean invalidating;
+
+ /**
+ * Instance level lock to sync for state changes.
+ */
+ final Object lock = new Object();
+
+ /**
+ * MessageClient subscriptions for this MessageClient.
+ */
+ private volatile CopyOnWriteArrayList<MessageClient> messageClients;
+
+ /**
+ * Queues of outbound messages to push to the client keyed by endpoint id.
+ * Map(String endpointId, EndpointQueue queue).
+ */
+ private final Map<String, EndpointQueue> outboundQueues = new ConcurrentHashMap<String, EndpointQueue>(1);
+
+ /**
+ * EndpointPushHandlers keyed by endpointId that the FlexClient
+ * can use to push messages to remote clients.
+ * NOTE: these can't be added to the EndpointQueue data type because the existence of queues depends
+ * upon client subscription state whereas endpoints that support push will generally set up their push
+ * handling before any subscriptions have been created.
+ */
+ private Map<String, EndpointPushHandler> endpointPushHandlers;
+
+ /**
+ * Associated FlexSessions that represent the connections the FlexClient makes to the server.
+ */
+ private final CopyOnWriteArrayList<FlexSession> sessions = new CopyOnWriteArrayList<FlexSession>(); // We always have at least one session.
+
+ /**
+ * Flag indicating whether the instance is valid; once invalidated this flag is
+ * set to false.
+ */
+ boolean valid;
+
+ /**
+ * The principal associated with this client. Only used when perClientAuthentication
+ * is being used.
+ */
+ private Principal userPrincipal;
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Adds a FlexClient attribute listener that will be notified when an
+ * attribute is added, removed or changed. If the attribute implements
+ * FlexClientBindingListener, it will be notified before any
+ * FlexClientAttributeListeners are notified.
+ *
+ * @param listener The listener to add.
+ */
+ public void addClientAttributeListener(FlexClientAttributeListener listener)
+ {
+ if (listener != null)
+ {
+ checkValid();
+
+ synchronized (lock)
+ {
+ if (attributeListeners == null)
+ attributeListeners = new CopyOnWriteArrayList<FlexClientAttributeListener>();
+ }
+
+ attributeListeners.addIfAbsent(listener);
+ }
+ }
+
+ /**
+ * Adds a destroy listener that will be notified when the FlexClient
+ * is destroyed. Listeners are notified after all attributes
+ * have been unbound from the FlexClient and any FlexClientBindingListeners
+ * and FlexClientAttributeListeners have been notified.
+ *
+ * @see flex.messaging.client.FlexClientListener
+ *
+ * @param listener The listener to add.
+ */
+ public void addClientDestroyedListener(FlexClientListener listener)
+ {
+ if (listener != null)
+ {
+ checkValid();
+
+ synchronized (lock)
+ {
+ if (destroyedListeners == null)
+ destroyedListeners = new CopyOnWriteArrayList<FlexClientListener>();
+ }
+
+ destroyedListeners.addIfAbsent(listener);
+ }
+ }
+
+ /**
+ * Returns the attribute bound to the specified name for the FlexClient, or null
+ * if no attribute is bound under the name.
+ *
+ * @param name The name the attribute is bound to.
+ * @return The attribute bound to the specified name.
+ */
+ public Object getAttribute(String name)
+ {
+ synchronized (lock)
+ {
+ checkValid();
+
+ updateLastUse();
+
+ return (attributes == null) ? null : attributes.get(name);
+ }
+ }
+
+ /**
+ * Returns a snapshot of the names of all attributes bound to the FlexClient.
+ *
+ * @return A snapshot of the names of all attributes bound to the FlexClient.
+ */
+ public Enumeration<String> getAttributeNames()
+ {
+ synchronized (lock)
+ {
+ checkValid();
+
+ updateLastUse();
+
+ if (attributes == null)
+ return Collections.enumeration(Collections.<String>emptyList());
+
+ // Return a copy so we do not run into concurrent modification problems if
+ // someone adds to the attributes while iterating through the returned enumeration.
+ return Collections.enumeration(new ArrayList<String>(attributes.keySet()));
+ }
+ }
+
+ /**
+ *
+ * Returns the push handler registered with the FlexClient with the supplied
+ * endpoint id, or null if no push handler was registered with the FlexClient
+ * for that endpoint.
+ *
+ * @return The push handler registered with the FlexClient with the supplied
+ * endpoint id, or null if no push handler was registered with the FlexClient
+ * for that endpoint.
+ */
+ public EndpointPushHandler getEndpointPushHandler(String endpointId)
+ {
+ synchronized (lock)
+ {
+ if (endpointPushHandlers != null && endpointPushHandlers.containsKey(endpointId))
+ return endpointPushHandlers.get(endpointId);
+ return null;
+ }
+ }
+
+ /**
+ *
+ * Returns the queue processor registered with the FlexClient with the supplied
+ * endpoint id, or null if no queue processor was registered with the FlexClient
+ * for that endpoint.
+ *
+ * @param endpointId The endpoint id.
+ * @return The queue processor registered with the FlexClient.
+ */
+ public FlexClientOutboundQueueProcessor getOutboundQueueProcessor(String endpointId)
+ {
+ EndpointQueue queue = outboundQueues.get(endpointId);
+ return (queue != null)? queue.processor : null;
+ }
+
+ /**
+ *
+ * Returns the endpoint queue registered with the FlexClient with the supplied
+ * endpoint id, or null if no endpoint queue was registered with the FlexClient
+ * for that endpoint.
+ *
+ * @param endpointId The endpoint id.
+ * @return The endpoint queue registered with the FlexClient.
+ */
+ public EndpointQueue getEndpointQueue(String endpointId)
+ {
+ return outboundQueues.get(endpointId);
+ }
+
+ /**
+ * Override {@link flex.messaging.util.TimeoutAbstractObject#getLastUse()} to make timeout
+ * dependent upon FlexClient inactivity but also upon the presence of an active push-enabled session,
+ * async or waited poll, or registered endpoint push handler (all of which indicate that a client has
+ * an active, open connection to the server).
+ *
+ * @return The 'last use' timestamp for the FlexClient, which may be the current system time if the FlexClient
+ * has been idle but an open connection from the client to the server exists.
+ */
+ @Override
+ public long getLastUse()
+ {
+ synchronized (lock)
+ {
+ long currentLastUse = super.getLastUse();
+ long idleTime = System.currentTimeMillis() - currentLastUse;
+ if (idleTime < flexClientManager.getFlexClientTimeoutMillis())
+ return currentLastUse; // Not timed out; this will trigger the timeout to be rescheduled.
+
+ // Check for async long-polls or endpoint streaming connections, if found, keep alive.
+ if (!outboundQueues.isEmpty())
+ {
+ for (EndpointQueue queue : outboundQueues.values())
+ {
+ if (queue.asyncPoll != null)
+ return System.currentTimeMillis();
+
+ if (endpointPushHandlers != null && endpointPushHandlers.containsKey(queue.endpointId))
+ return System.currentTimeMillis();
+ }
+ }
+
+ // Check for connected sessions, or a session holding a (non-async) long poll and if found, keep alive.
+ for (FlexSession session : sessions)
+ {
+ if (session instanceof ConnectionAwareSession)
+ {
+ if (((ConnectionAwareSession)session).isConnected())
+ return System.currentTimeMillis();
+ }
+ // Otherwise, check for a long-poll.
+ if (session.waitMonitor != null)
+ {
+ for (EndpointQueue queue : session.waitMonitor.values())
+ {
+ if (queue.flexClient.equals(this))
+ return System.currentTimeMillis();
+ }
+ }
+ }
+ return currentLastUse; // Allow the FlexClient to timeout.
+ }
+ }
+
+ /**
+ * Returns the attribute bound to the specified name for the current FlexSession
+ * associated with the FlexClient. If the attribute does not exist in the current
+ * FlexSession, this method iterates through all the other FlexSessions associated with
+ * the FlexClient and either returns the attribute bound, or null if no attribute is bound
+ * under the name.
+ *
+ * @param name The name the attribute is bound to.
+ * @return The attribute bound to the specified name.
+ */
+ public Object getSessionAttribute(String name)
+ {
+ Object attributeValue = getSessionAttributeInCurrentSession(name);
+ if (attributeValue != null)
+ return attributeValue;
+
+ return getSessionAttributeInOtherSessions(name);
+ }
+
+ /**
+ * Returns a snapshot of the names of all attributes bound to all the FlexSessions
+ * associated with the FlexClient.
+ *
+ * @return A snapshot of the names of all attributes bound to all the FlexSessions
+ * associated with the FlexClient.
+ */
+ public Enumeration<String> getSessionAttributeNames()
+ {
+ Set<String> attributeNames = new HashSet<String>();
+ for (FlexSession session : sessions)
+ attributeNames.addAll(getSessionAttributeNames(session));
+ return Collections.enumeration(attributeNames);
+ }
+
+ /**
+ *
+ *
+ * Returns the principal associated with this client. If the client has not
+ * authenticated the principal will be null. Should only be called from FlexContext
+ * and only if perClientAuthentication is used. Not available to users.
+ *
+ * @return The principal associated with the session.
+ */
+ public Principal getUserPrincipal()
+ {
+ synchronized (lock)
+ {
+ checkValid();
+ return userPrincipal;
+ }
+ }
+
+ /**
+ *
+ *
+ * Should only be called from FlexContext and only if perClientAuthentication is used.
+ * Not available to users.
+ *
+ * @param userPrincipal The principal to associate with the session.
+ */
+ public void setUserPrincipal(Principal userPrincipal)
+ {
+ synchronized (lock)
+ {
+ checkValid();
+ this.userPrincipal = userPrincipal;
+ }
+ }
+
+ /**
+ * Invalidates the FlexClient.
+ */
+ public void invalidate()
+ {
+ synchronized (lock)
+ {
+ if (!valid || invalidating)
+ return; // Already shutting down.
+
+ invalidating = true; // This thread gets to shut the FlexClient down.
+ flexClientManager.removeFlexClient(this);
+ cancelTimeout();
+ }
+
+ // Unregister from all FlexSessions.
+ if (!sessions.isEmpty())
+ {
+ for (FlexSession session : sessions)
+ unregisterFlexSession(session);
+ }
+
+ // Invalidate associated MessageClient subscriptions.
+ if (messageClients != null && !messageClients.isEmpty())
+ {
+ for (MessageClient messageClient : messageClients)
+ {
+ messageClient.removeMessageClientDestroyedListener(this);
+ messageClient.invalidate();
+ }
+ messageClients.clear();
+ }
+
+ // Notify destroy listeners that we're shutting the FlexClient down.
+ if (destroyedListeners != null && !destroyedListeners.isEmpty())
+ {
+ for (FlexClientListener destroyListener : destroyedListeners)
+ {
+ destroyListener.clientDestroyed(this);
+ }
+ destroyedListeners.clear();
+ }
+
+ // Unbind all attributes.
+ if (attributes != null && !attributes.isEmpty())
+ {
+ Set<String> keySet = attributes.keySet();
+ String[] keys = keySet.toArray(new String[keySet.size()]);
+ for (String key : keys)
+ removeAttribute(key);
+ }
+
+ // Close any registered push handlers.
+ if (endpointPushHandlers != null && !endpointPushHandlers.isEmpty())
+ {
+ for (EndpointPushHandler handler : endpointPushHandlers.values())
+ {
+ handler.close(true /* notify Channel of disconnect */);
+ }
+ endpointPushHandlers = null;
+ }
+
+ synchronized (lock)
+ {
+ valid = false;
+ invalidating = false;
+ }
+
+ if (Log.isDebug())
+ Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("FlexClient with id '" + this.id + "' has been invalidated.");
+ }
+
+ /**
+ * Returns true if the FlexClient is valid; false if it has been invalidated.
+ *
+ * @return true if the FlexClient is valid; otherwise false.
+ */
+ public boolean isValid()
+ {
+ synchronized (lock)
+ {
+ return valid;
+ }
+ }
+
+ /**
+ * Returns a snapshot of the FlexSessions associated with the FlexClient
+ * when this method is invoked.
+ * This list is not guaranteed to remain consistent with the actual list
+ * of active FlexSessions associated with the FlexClient over time.
+ *
+ * @return A snapshot of the current list of FlexSessions associated with the FlexClient.
+ */
+ public List<FlexSession> getFlexSessions()
+ {
+ List<FlexSession> currentSessions;
+ synchronized (lock)
+ {
+ checkValid();
+
+ updateLastUse();
+
+ currentSessions = new ArrayList<FlexSession>(sessions); // Make a copy of the current list to return.
+ }
+ return currentSessions;
+ }
+
+ /**
+ * Return the session count.
+ *
+ * @return The number of sessions associated with this FlexClient.
+ */
+ public int getSessionCount()
+ {
+ int sessionCount;
+ synchronized (lock)
+ {
+ sessionCount = (sessions != null) ? sessions.size() : 0; // Make a copy of the current list to return.
+ }
+ return sessionCount;
+ }
+
+ /**
+ * Return the subscription count.
+ *
+ * @return The number of subscriptions associated with this FlexClient.
+ */
+ public int getSubscriptionCount()
+ {
+ int count = 0;
+ synchronized (lock)
+ {
+
+ if (messageClients != null && !messageClients.isEmpty())
+ {
+ for (MessageClient messageClient : messageClients)
+ count += messageClient.getSubscriptionCount();
+ }
+
+ }
+ return count;
+ }
+
+ /**
+ * Returns the message client registered with the FlexClient with the supplied
+ * client id, or null if no message client was registered with the FlexClient
+ * with that client id.
+ *
+ * @param clientId The client id.
+ * @return The message client registered with the FlexClient.
+ */
+ public MessageClient getMessageClient(String clientId)
+ {
+ synchronized (lock)
+ {
+ if (messageClients != null && !messageClients.isEmpty())
+ {
+ for (MessageClient messageClient : messageClients)
+ {
+ if (messageClient.getClientId().equals(clientId))
+ return messageClient;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns a snapshot of the MessageClients (subscriptions) associated with the FlexClient
+ * when this method is invoked.
+ * This list is not guaranteed to remain consistent with the actual list
+ * of active MessageClients associated with the FlexClient over time.
+ *
+ * @return A snapshot of the current list of MessageClients associated with the FlexClient.
+ */
+ public List<MessageClient> getMessageClients()
+ {
+ List<MessageClient> currentMessageClients;
+ synchronized (lock)
+ {
+ checkValid();
+
+ updateLastUse();
+
+ currentMessageClients = (messageClients != null) ? new ArrayList<MessageClient>(messageClients) // Make a copy of the current list to return.
+ : Collections.<MessageClient>emptyList(); // Return an empty list.
+ }
+ return currentMessageClients;
+ }
+
+ /**
+ * Returns the unique Id for the FlexClient.
+ *
+ * @return The unique Id for the FlexClient.
+ */
+ public String getId()
+ {
+ return id;
+ }
+
+ /**
+ *
+ * Implements TimeoutCapable.
+ * Determine the time, in milliseconds, that this object is allowed to idle
+ * before having its timeout method invoked.
+ */
+ @Override
+ public long getTimeoutPeriod()
+ {
+ return flexClientManager.getFlexClientTimeoutMillis();
+ }
+
+ /**
+ *
+ * Implements MessageClientListener.
+ * Handling created events is a no-op.
+ *
+ * @param messageClient The new MessageClient.
+ */
+ public void messageClientCreated(MessageClient messageClient) {}
+
+ /**
+ *
+ * Implements MessageClientListener.
+ * Notification that an associated FlexSession was destroyed.
+ *
+ * @param messageClient The MessageClient that was destroyed.
+ */
+ public void messageClientDestroyed(MessageClient messageClient)
+ {
+ unregisterMessageClient(messageClient);
+ }
+
+ /**
+ *
+ * Poll for outbound messages for the FlexClient.
+ * This method is only invoked by internal code while processing a client poll request; it
+ * is not intended for general public use.
+ * Poll requests that trigger this method come from client-side polling channels and the request
+ * is not specific to a single Consumer/MessageClient instance so process any queued messages for
+ * the specified endpoint across all subscriptions.
+ *
+ * @param endpointId The Id of the endpoint that received the poll request.
+ * @return The flush result including messages to return in the poll response and
+ * an optional wait time for the next poll/flush.
+ */
+ public FlushResult poll(String endpointId)
+ {
+ EndpointQueue queue = null;
+ synchronized (lock)
+ {
+ checkValid();
+
+ queue = outboundQueues.get(endpointId);
+
+ if (queue != null)
+ return internalPoll(queue);
+ }
+
+ if (queue == null)
+ {
+ // Otherwise, the client is not subscribed.
+ throwNotSubscribedException(endpointId);
+ }
+
+ return null;
+ }
+
+ /**
+ *
+ * Poll for outbound messages for the FlexClient and if no messages are available
+ * immediately, store a reference to the passed async handler and call back when messages arrive.
+ *
+ * @param endpointId The Id of the endpoint that received the poll request.
+ * @param handler The handler to callback when messages arrive.
+ * @param waitIntervalMillis The wait interval in milliseconds for the poll to wait for data to arrive
+ * before returning an empty poll response.
+ *
+ * @return A <tt>TimeoutAbstractObject</tt> representing the asynchronous poll, or <code>null</code>
+ * if the poll request was handled immediately because data was available to return.
+ */
+ public TimeoutAbstractObject pollAsync(String endpointId, AsyncPollHandler handler, long waitIntervalMillis)
+ {
+ EndpointQueue queue;
+ TimeoutAbstractObject asyncPollTask = null;
+
+ synchronized (lock)
+ {
+ checkValid();
+
+ queue = outboundQueues.get(endpointId);
+
+ // If the queue exists and is not empty, flush immediately.
+ if (queue != null)
+ {
+ if (!queue.messages.isEmpty())
+ {
+ handler.asyncPollComplete(internalFlush(queue));
+ }
+ else // Set up an async long-poll.
+ {
+ // Avoid monopolizing user agent connections.
+ FlexSession session = FlexContext.getFlexSession();
+ synchronized (session)
+ {
+ if (session.asyncPollMap != null)
+ {
+ AsyncPollWithTimeout parkedPoll = session.asyncPollMap.get(endpointId);
+ if (parkedPoll != null)
+ {
+ // If the poll is from the same client for this endpoint, treat it as a no-op.
+ if (parkedPoll.getFlexClient().equals(this))
+ {
+ PollFlushResult result = new PollFlushResult();
+ result.setClientProcessingSuppressed(true);
+ handler.asyncPollComplete(result);
+ }
+ else // If the poll is for a different client on the same session, swap their waits.
+ {
+ PollFlushResult result = new PollFlushResult();
+ result.setAvoidBusyPolling(true);
+ completeAsyncPoll(parkedPoll, result);
+ }
+ }
+ }
+ AsyncPollWithTimeout asyncPoll = new AsyncPollWithTimeout(this, session, queue, handler, waitIntervalMillis, endpointId);
+ synchronized (session)
+ {
+ if (session.asyncPollMap == null)
+ session.asyncPollMap = new HashMap<String, AsyncPollWithTimeout>();
+ session.asyncPollMap.put(endpointId, asyncPoll);
+ }
+ queue.asyncPoll = asyncPoll;
+ asyncPollTask = asyncPoll;
+ }
+ }
+ }
+ }
+ if (queue == null)
+ {
+ // The queue was null; let the client know that there are no active subscriptions.
+ throwNotSubscribedException(endpointId);
+ }
+ return asyncPollTask;
+ }
+
+ /**
+ *
+ * Poll for outbound messages for the FlexClient and if no messages are available
+ * immediately, put processing into a wait state until messages arrive.
+ * This method is only invoked by internal code while processing a client poll request; it
+ * is not intended for general public use.
+ * Poll requests that trigger this method come from client-side polling channels and the request
+ * is not specific to a single Consumer/MessageClient instance so process any queued messages for
+ * the specified endpoint across all subscriptions.
+ *
+ * @param endpointId The Id of the endpoint that received the poll request.
+ * @param session The FlexSession associated with this waitable poll request.
+ * @param listener The listener to notify before a wait begins and as soon as one completes.
+ * @param waitIntervalMillis The maximum amount of time to wait for messages in milliseconds.
+ * @return The flush result including messages to return in the poll response and
+ * an optional wait time for the next poll/flush.
+ */
+ public FlushResult pollWithWait(String endpointId, FlexSession session, PollWaitListener listener, long waitIntervalMillis)
+ {
+ EndpointQueue queue;
+ synchronized (lock)
+ {
+ checkValid();
+
+ queue = outboundQueues.get(endpointId);
+
+ // If the queue exists and is not empty there's no reason to wait; flush immediately.
+ if (queue != null)
+ {
+ FlushResult flushResult = internalPoll(queue);
+ if (flushResult != null)
+ return flushResult;
+ }
+ }
+
+ // The queue exists but it was empty; we can try to wait for messages.
+ if (queue != null)
+ {
+ synchronized (session)
+ {
+ // Set up the waitMonitor on the session; this is a reference to the queue that the
+ // current poll request targets and we use it as a wait/notify monitor.
+ // This also lets us prevent busy polling cycles from a single client. If we already have a waited
+ // poll request a subsequent poll request is treated as a no-op.
+ if (session.waitMonitor != null)
+ {
+ final EndpointQueue waitingQueue = session.waitMonitor.get(endpointId);
+ // If the poll is from the same client swf, and the same endpoint, treat it as a no-op poll.
+ if (waitingQueue != null && waitingQueue.flexClient.equals(this))
+ {
+ PollFlushResult result = new PollFlushResult();
+ result.setClientProcessingSuppressed(true);
+ return result;
+ }
+ }
+ else
+ {
+ session.waitMonitor = new HashMap<String, EndpointQueue>();
+ }
+
+ // Set the waitMonitor for the session to the queue
+ // for this poll request before releasing the lock.
+ session.waitMonitor.put(endpointId, queue);
+ }
+
+ // Now that the session references the wait monitor this thread will use to wait we can enter
+ // the wait state.
+ // -1 wait-interval actually means wait until notified.
+ waitIntervalMillis = (waitIntervalMillis == -1) ? 0 : waitIntervalMillis;
+ String threadName = Thread.currentThread().getName();
+ try
+ {
+ boolean didWait = false;
+ boolean avoidBusyPolling = false;
+ synchronized (queue)
+ {
+ // If the message queue is still empty, wait for a message to be added before invoking flush.
+ if (queue.messages.isEmpty())
+ {
+ reportStatusIfDebug("waiting for new messages to arrive");
+
+ didWait = true;
+
+ // Tag thread name during the wait.
+ Thread currentThread = Thread.currentThread();
+ currentThread.setName(threadName + POLL_WAIT_THREAD_NAME_EXTENSION);
+
+ if (listener != null)
+ listener.waitStart(queue);
+
+ queue.waitPoll = true; // Mark the queue as waiting.
+
+ queue.wait(waitIntervalMillis);
+
+ queue.waitPoll = false; // Unmark the queue as waiting.
+
+ // Reset thread name now that the wait is over.
+ currentThread.setName(threadName);
+
+ if (listener != null)
+ listener.waitEnd(queue);
+
+ if (queue.avoidBusyPolling)
+ {
+ avoidBusyPolling = true;
+ queue.avoidBusyPolling = false;
+ }
+ }
+ }
+
+ synchronized (session)
+ {
+ if (session.waitMonitor != null)
+ {
+ session.waitMonitor.remove(endpointId);
+ }
+ }
+
+ if (Log.isDebug())
+ {
+ if (didWait)
+ reportStatusIfDebug("done waiting for new messages to arrive and is flushing the outbound queue");
+ else
+ reportStatusIfDebug("didn't need to wait and is flushing the outbound queue");
+ }
+
+ // We need to hold the FlexClient lock to invoke flush.
+ FlushResult result;
+ synchronized (lock)
+ {
+ result = internalFlush(queue);
+ }
+ if (avoidBusyPolling)
+ {
+ PollFlushResult swappedPollResult = new PollFlushResult();
+ if (result != null)
+ {
+ swappedPollResult.setMessages(result.getMessages());
+ swappedPollResult.setNextFlushWaitTimeMillis(result.getNextFlushWaitTimeMillis());
+ }
+ swappedPollResult.setAvoidBusyPolling(true);
+ result = swappedPollResult;
+ }
+ return result;
+ }
+ catch (InterruptedException e)
+ {
+ if (Log.isWarn())
+ Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).warn("Poll wait thread '" + threadName + "' for FlexClient with id '" + this.id +
+ "' could not finish waiting for new messages to arrive " +
+ "because it was interrupted: " + e.toString());
+ }
+ }
+ else
+ {
+ // The queue was null; let the client know that there are no active subscriptions.
+ throwNotSubscribedException(endpointId);
+ }
+ return null;
+ }
+
+ private void reportStatusIfDebug(String message)
+ {
+ String threadName = Thread.currentThread().getName();
+ if (Log.isDebug())
+ Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("Poll wait thread '" + threadName + "' for FlexClient with id '" + this.id + "' is " + message);
+ }
+
+ /**
+ *
+ * Poll for outbound messages for a specific MessageClient/Consumer.
+ * This overload of poll() is only invoked when handling a Consumer.receive() request.
+ *
+ * @param client The specific MessageClient instance to poll for messages for.
+ * @return The flush result including messages to return in the poll response.
+ * The nextFlushWaitTimeMillis value is always forced to a value of 0 because
+ * Consumer.receive() calls are driven by client code and this setting has no meaning.
+ */
+ public FlushResult poll(MessageClient client)
+ {
+ FlushResult flushResult = null;
+ String endpointId = client.getEndpointId();
+ EndpointQueue queue = null;
+ synchronized (lock)
+ {
+ checkValid();
+
+ queue = outboundQueues.get(endpointId);
+ if (queue != null)
+ {
+ try
+ {
+ flushResult = internalFlush(queue, client);
+ }
+ catch (RuntimeException e)
+ {
+ if (Log.isError())
+ Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).error("Failed to flush an outbound queue for MessageClient '" + client.getClientId() + "' for FlexClient '" + getId() + "'.", e);
+ throw e;
+ }
+ if (flushResult != null)
+ flushResult.setNextFlushWaitTimeMillis(0); // Force to 0.
+ }
+ }
+ if (queue == null)
+ {
+ throwNotSubscribedException(endpointId);
+ }
+ return flushResult;
+ }
+
+ /**
+ *
+ * Push a message to the FlexClient.
+ * The message is added to the outbound queue of messages for the client and
+ * will be pushed if possible or retrieved via a client poll request.
+ *
+ * @param message The Message to push.
+ * @param messageClient The MessageClient subscription that this message targets.
+ */
+ public void push(Message message, MessageClient messageClient)
+ {
+ // We should check the message client is valid or not
+ if (!isValid())
+ return;
+
+ // Route this message to the proper per-endpoint outbound queue.
+ EndpointQueue queue = outboundQueues.get(messageClient.getEndpointId());
+
+ // This queue may be null if all corresponding subscriptions have been invalidated.
+ if (queue == null)
+ return;
+
+ boolean empty;
+
+ // We need to obtain the lock here
+ // Maintain the pattern of using the FlexClient.lock and ensure that order of locks should always start with the FlexClient.lock
+ // This is critical to prevent deadlock cases, see Watson bug 2724938
+ synchronized (lock)
+ {
+ synchronized (queue) // To protect the list during the add and allow for notification.
+ {
+ // Let the processor add the message to the queue.
+ try
+ {
+ queue.processor.add(queue.messages, message);
+ empty = queue.messages.isEmpty();
+
+ if (Log.isDebug())
+ Log.getLogger(LogCategories.MESSAGE_GENERAL).debug(
+ "Queuing message: " + message.getMessageId() +
+ StringUtils.NEWLINE +
+ " to send to MessageClient: " + messageClient.getClientId() +
+ StringUtils.NEWLINE +
+ " for FlexClient: " + messageClient.getFlexClient().getId() +
+ StringUtils.NEWLINE +
+ " via endpoint: " + queue.endpointId +
+ StringUtils.NEWLINE +
+ " client outbound queue size: " + queue.messages.size());
+ }
+ catch (RuntimeException e)
+ {
+ if (Log.isError())
+ Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).error("Failed to add a message to an outbound queue for FlexClient '" + getId() + "'.", e);
+ throw e;
+ }
+ // And notify any threads that may be in a poll wait state.
+ if (!empty && queue.waitPoll)
+ {
+ // TODO This updateLastUse call is added here because there used to be a call
+ // at the beginning of the push method but not convinced that it is needed.
+ updateLastUse();
+ queue.notifyAll();
+ }
+ }
+
+ if (!empty)
+ {
+ if (queue.asyncPoll != null)
+ {
+ completeAsyncPoll(queue.asyncPoll, internalFlush(queue));
+ }
+ else if (!empty && queue.flushTask == null &&
+ (queue.pushSession != null || (endpointPushHandlers != null && endpointPushHandlers.containsKey(queue.endpointId))))
+ {
+ // If a delayed flush is not scheduled and we have a push-enabled session associated with the queue
+ // or a push-enabled endpoint, try a direct push to the client.
+ // Once again we should acquire the lock for queue, otherwise a potential dead lock could happen, see Watson bug 2724936
+ // By acquiring the queue lock again, we break the cycle by acquiring the queue before holding FlexClient.lock object
+ synchronized (queue)
+ {
+ directFlush(queue);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * Registers an <tt>EndpointPushHandler</tt> for the specified endpoint to handle pushing messages
+ * to remote clients.
+ *
+ * @param handler The <tt>EndpointPushHandler</tt> to register.
+ * @param endpointId The endpoint to register for.
+ */
+ public void registerEndpointPushHandler(EndpointPushHandler handler, String endpointId)
+ {
+ synchronized (lock)
+ {
+ if (endpointPushHandlers == null)
+ endpointPushHandlers = new HashMap<String, EndpointPushHandler>(1);
+
+ if (endpointPushHandlers.containsKey(endpointId))
+ {
+ MessageException me = new MessageException();
+ me.setMessage(ENDPOINT_PUSH_HANDLER_ALREADY_REGISTERED, new Object[] {getId(), endpointId});
+ throw me;
+ }
+
+ endpointPushHandlers.put(endpointId, handler);
+ }
+ }
+
+ /**
+ *
+ * Used internally to associate a FlexSession with this FlexClient.
+ *
+ * @param session The FlexSession to associate with this FlexClient.
+ */
+ public void registerFlexSession(FlexSession session)
+ {
+ if (sessions.addIfAbsent(session))
+ {
+ session.addSessionDestroyedListener(this);
+ session.registerFlexClient(this);
+ }
+ }
+
+ /**
+ *
+ * Used internally to associate a MessageClient with this FlexClient.
+ *
+ * @param messageClient The MessageClient to associate with this FlexClient.
+ */
+ public void registerMessageClient(MessageClient messageClient)
+ {
+ synchronized (lock)
+ {
+ if (messageClients == null)
+ messageClients = new CopyOnWriteArrayList<MessageClient>();
+ }
+
+ if (messageClients.addIfAbsent(messageClient))
+ {
+ messageClient.addMessageClientDestroyedListener(this);
+ String endpointId = messageClient.getEndpointId();
+ // Manage the outbound queue this MessageClient's subscription(s) will use
+ // and associate the MessageClient with an EndpointPushHandler if one exists for the
+ // endpoint the subscription was made over; this allows the shut-down of a
+ // push connection to invalidate any subscriptions that are using it.
+ synchronized (lock)
+ {
+ getOrCreateEndpointQueueAndRegisterSubscription(messageClient, endpointId);
+ if (endpointPushHandlers != null)
+ {
+ EndpointPushHandler handler = endpointPushHandlers.get(endpointId);
+ if (handler != null)
+ handler.registerMessageClient(messageClient);
+ }
+ }
+ }
+ }
+
+ /**
+ * Removes the attribute bound to the specified name for the FlexClient.
+ *
+ * @param name The name of the attribute to remove.
+ */
+ public void removeAttribute(String name)
+ {
+ Object value; // Used for event dispatch after the attribute is removed.
+
+ synchronized (lock)
+ {
+ checkValid();
+
+ updateLastUse();
+
+ value = (attributes != null) ? attributes.remove(name) : null;
+ }
+
+ // If no value was bound under this name it's a no-op.
+ if (value == null)
+ return;
+
+ notifyAttributeUnbound(name, value);
+ notifyAttributeRemoved(name, value);
+ }
+
+ /**
+ * Removes a FlexClient attribute listener.
+ *
+ * @param listener The listener to remove.
+ */
+ public void removeClientAttributeListener(FlexClientAttributeListener listener)
+ {
+ // No need to check validity; removing a listener is always ok.
+ if (listener != null && attributeListeners != null)
+ attributeListeners.remove(listener);
+ }
+
+ /**
+ * Removes a FlexClient destroyed listener.
+ *
+ * @see flex.messaging.client.FlexClientListener
+ *
+ * @param listener The listener to remove.
+ */
+ public void removeClientDestroyedListener(FlexClientListener listener)
+ {
+ // No need to check validity; removing a listener is always ok.
+ if (listener != null && destroyedListeners != null)
+ destroyedListeners.remove(listener);
+ }
+
+ /**
+ * Removes the attribute bound to the specified name for all the FlexSessions
+ * associated with the FlexClient.
+ *
+ * @param name The name of the attribute to remove.
+ */
+ public void removeSessionAttribute(String name)
+ {
+ for (FlexSession session : sessions)
+ session.removeAttribute(name);
+ }
+
+ /**
+ *
+ * Implements FlexSessionListener interface.
+ * Notification that a FlexSession was created.
+ * This is a no-op because the FlexClient is never added as a static FlexSession created listener
+ * but this method is required by the interface. We only listen for the destroyed event from
+ * associated FlexSessions.
+ *
+ * @param session The FlexSession that was created.
+ */
+ public void sessionCreated(FlexSession session) {}
+
+ /**
+ *
+ * Implements FlexSessionListener interface.
+ * Notification that an associated FlexSession was destroyed.
+ *
+ * @param session The FlexSession that was destroyed.
+ */
+ public void sessionDestroyed(FlexSession session)
+ {
+ unregisterFlexSession(session);
+ }
+
+ /**
+ * Binds an attribute value for the FlexClient under the specified name.
+ *
+ * @param name The name to bind the attribute under.
+ * @param value The value of the attribute.
+ */
+ public void setAttribute(String name, Object value)
+ {
+ // Null value set is the same as removeAttribute().
+ if (value == null)
+ {
+ removeAttribute(name);
+ return;
+ }
+
+ Object oldValue; // Used to determine which events to dispatch after the set is performed.
+
+ // Only synchronize for the attribute mutation; event dispatch doesn't require it.
+ synchronized (lock)
+ {
+ checkValid();
+
+ updateLastUse();
+
+ if (attributes == null)
+ attributes = new HashMap<String, Object>();
+
+ oldValue = attributes.put(name, value);
+ }
+
+ if (oldValue == null)
+ {
+ notifyAttributeBound(name, value);
+ notifyAttributeAdded(name, value);
+ }
+ else
+ {
+ notifyAttributeUnbound(name, oldValue);
+ notifyAttributeReplaced(name, oldValue);
+ notifyAttributeBound(name, value);
+ }
+ }
+
+ /**
+ * Binds an attribute value for the current FlexSession associated with the
+ * FlexClient under the specified name. If the current FlexSession is NIO-based
+ * (NIOHTTPFlexSession or RTMPFlexSession), and if the FlexClient is associated
+ * with a Servlet-based session (HttpFlexSession) as well, the attribute is bound
+ * on the Servlet-based session too to make it available to the underlying J2EE HttpSession.
+ *
+ * @param name The name to bind the attribute under.
+ * @param value The value of the attribute.
+ */
+ public void setSessionAttribute(String name, Object value)
+ {
+ setSessionAttributeInCurrentSession(name, value);
+ if (!isCurrentSessionServletBased())
+ setSessionAttributeInServletBasedSession(name, value);
+ }
+
+ /**
+ *
+ * Implements TimeoutCapable.
+ * Inform the object that it has timed out.
+ */
+ public void timeout()
+ {
+ invalidate();
+ }
+
+ /**
+ *
+ * Unregisters an <tt>EndpointPushHandler</tt> from the specified endpoint.
+ *
+ * @param handler The <tt>EndpointPushHandler</tt> to unregister.
+ * @param endpointId The endpoint to unregister from.
+ */
+ public void unregisterEndpointPushHandler(EndpointPushHandler handler, String endpointId)
+ {
+ synchronized (lock)
+ {
+ if (endpointPushHandlers == null)
+ return; // No-op.
+
+ if (endpointPushHandlers.get(endpointId).equals(handler))
+ endpointPushHandlers.remove(endpointId);
+ }
+ }
+
+ /**
+ *
+ * Used internally to disassociate a FlexSession from this FlexClient.
+ *
+ * @param session The FlexSession to disassociate from this FlexClient.
+ */
+ public void unregisterFlexSession(FlexSession session)
+ {
+ if (sessions.remove(session))
+ {
+ session.removeSessionDestroyedListener(this);
+ session.unregisterFlexClient(this);
+ // Once all client sessions/connections terminate; shut down.
+ if (sessions.isEmpty())
+ invalidate();
+ }
+ }
+
+ /**
+ *
+ * Used internally to disassociate a MessageClient (subscription) from a FlexClient.
+ *
+ * @param messageClient The MessageClient to disassociate from the FlexClient.
+ */
+ public void unregisterMessageClient(MessageClient messageClient)
+ {
+ if (messageClients != null && messageClients.remove(messageClient))
+ {
+ messageClient.removeMessageClientDestroyedListener(this);
+ String endpointId = messageClient.getEndpointId();
+ // Manage the outbound queue that this subscription uses.
+ synchronized (lock)
+ {
+ EndpointQueue queue = outboundQueues.get(endpointId);
+ if (queue != null)
+ {
+ // Decrement the ref count of MessageClients using this queue.
+ queue.messageClientRefCount--;
+
+ // Unregister the message client from the outbound throttle
+ // manager (if one exists).
+ OutboundQueueThrottleManager tm = queue.processor.getOutboundQueueThrottleManager();
+ if (tm != null)
+ tm.unregisterAllSubscriptions(messageClient.getDestinationId());
+
+ // If we're not attempting to notify the remote client that this MessageClient has
+ // been invalidated, remove any associated messages from the queue.
+ if (!messageClient.isAttemptingInvalidationClientNotification())
+ {
+ Object messageClientId = messageClient.getClientId();
+ for (Iterator<Message> iter = queue.messages.iterator(); iter.hasNext(); )
+ {
+ Message message = iter.next();
+ if (message.getClientId().equals(messageClientId))
+ iter.remove();
+ }
+ }
+
+ // If no active subscriptions require the queue, clean it up if possible.
+ if (queue.messageClientRefCount == 0)
+ {
+ if (queue.messages.isEmpty() || messageClient.isClientChannelDisconnected())
+ {
+ if (queue.asyncPoll != null) // Close out async long-poll if one is registered.
+ {
+ FlushResult flushResult = internalFlush(queue);
+ // If the MessageClient isn't attempting client notification, override
+ // and do so in this case to suppress the next poll request from the remote client
+ // which will fail triggering an unnecessary channel disconnect on the client.
+ if (!messageClient.isAttemptingInvalidationClientNotification())
+ {
+ CommandMessage msg = new CommandMessage();
+ msg.setClientId(messageClient.getClientId());
+ msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION);
+ List<Message> messages = flushResult.getMessages();
+ if (messages == null)
+ messages = new ArrayList<Message>(1);
+ messages.add(msg);
+ }
+ completeAsyncPoll(queue.asyncPoll, flushResult);
+ }
+
+ // Remove the empty, unused queue.
+ outboundQueues.remove(endpointId);
+ }
+ // Otherwise, the queue is being used by a polling client or contains messages
+ // that will be written by a delayed flush.
+ // Leave it in place. Once the next poll request or delayed flush occurs the
+ // queue will be cleaned up at that point. See internalFlush() and shutdownQueue().
+ }
+
+ // Make sure to notify any threads waiting on this queue that may be associated
+ // with the subscription that's gone away.
+ synchronized (queue)
+ {
+ queue.notifyAll();
+ }
+ }
+ // And if this subscription was associated with an endpoint push handler, unregister it.
+ if (endpointPushHandlers != null)
+ {
+ EndpointPushHandler handler = endpointPushHandlers.get(endpointId);
+ if (handler != null)
+ handler.unregisterMessageClient(messageClient);
+ }
+ }
+ }
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Protected Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Utility method that tests validity and throws an exception if the instance
+ * has been invalidated.
+ */
+ protected void checkValid()
+ {
+ synchronized (lock)
+ {
+ if (!valid)
+ {
+ MessageException e = new MessageException();
+ e.setMessage(FLEX_CLIENT_INVALIDATED);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Invoked to clean up a timed out or closed async poll.
+ *
+ * @param asyncPoll The async poll to complete.
+ * @param result The FlushResult for the poll response.
+ */
+ protected void completeAsyncPoll(AsyncPollWithTimeout asyncPoll, FlushResult result)
+ {
+ synchronized (lock)
+ {
+ asyncPoll.cancelTimeout();
+ EndpointQueue queue = asyncPoll.getEndpointQueue();
+ if (queue.asyncPoll.equals(asyncPoll))
+ queue.asyncPoll = null;
+ FlexSession session = asyncPoll.getFlexSession();
+ synchronized (session)
+ {
+ if (session.asyncPollMap != null)
+ session.asyncPollMap.remove(asyncPoll.getEndpointId());
+ }
+ asyncPoll.getHandler().asyncPollComplete(result);
+ }
+ }
+
+ /**
+ * Invoked to flush queued outbound messages to a client directly using a session
+ * that supports real-time push.
+ * Called by push() or delayed flush tasks for push-enabled sessions/connections.
+ */
+ protected void directFlush(EndpointQueue queue)
+ {
+ synchronized (lock)
+ {
+ // No need to invoke flush if the FlexClient has been invalidated.
+ if (!valid)
+ return;
+
+ // If this invocation is a callback from a flush task, null out the task ref on
+ // the queue to allow a subsequent delayed flush to be scheduled.
+ if (queue.flushTask != null)
+ queue.flushTask = null;
+
+ FlushResult flushResult = internalFlush(queue, null, false /* updateLastUse */);
+ if (flushResult == null) // If there's no flush result, return.
+ return;
+
+ // Pass any messages that are ready to flush off to the network layer.
+ List<Message> messages = flushResult.getMessages();
+ if (messages != null && !messages.isEmpty())
+ {
+ if (queue.pushSession != null)
+ {
+ if (queue.pushSession instanceof ConnectionAwareSession)
+ {
+ // Update last use only if we're actually writing back to the client.
+ if ( ((ConnectionAwareSession)queue.pushSession).isConnected() )
+ updateLastUse();
+ }
+ for (Message msg : messages)
+ queue.pushSession.push(msg);
+ }
+ else if (endpointPushHandlers != null)
+ {
+ updateLastUse();
+ EndpointPushHandler handler = endpointPushHandlers.get(queue.endpointId);
+ handler.pushMessages(messages);
+ }
+ }
+
+ // Schedule a delayed flush if necessary.
+ int flushWaitTime = flushResult.getNextFlushWaitTimeMillis();
+ if (flushWaitTime > 0) // Set up and schedule the delayed flush task.
+ queue.flushTask = new FlexClientScheduledFlushForPush(queue, flushWaitTime);
+ }
+ }
+
+ /**
+ * Utility method to initialize an EndpointQueue (if necessary) and associate a subscription (MessageClient) with it.
+ */
+ protected EndpointQueue getOrCreateEndpointQueueAndRegisterSubscription(MessageClient messageClient, String endpointId)
+ {
+ EndpointQueue newQueue;
+ if (!outboundQueues.containsKey(endpointId))
+ {
+ newQueue = new EndpointQueue();
+ newQueue.flexClient = this;
+ newQueue.endpointId = endpointId;
+ newQueue.endpoint = flexClientManager.getMessageBroker().getEndpoint(endpointId);
+ newQueue.messages = new ArrayList<Message>(); /* Default size of 10 is fine */
+ FlexSession session = messageClient.getFlexSession();
+ if (session.isPushSupported())
+ newQueue.pushSession = session;
+ newQueue.processor = flexClientManager.createOutboundQueueProcessor(this, endpointId);
+ newQueue.messageClientRefCount = 1;
+
+ outboundQueues.put(endpointId, newQueue);
+ }
+ else
+ {
+ newQueue = outboundQueues.get(endpointId);
+ newQueue.messageClientRefCount++;
+ // Resubscribes as a result of network connectivity issues may arrive over the same
+ // endpoint but use a new session.
+ FlexSession session = messageClient.getFlexSession();
+ if (session.isPushSupported())
+ newQueue.pushSession = session;
+ }
+ return newQueue;
+ }
+
+ /**
+ * Utility method to flush the outbound queue and log any problems.
+ * Any exceptions are logged and then rethrown.
+ *
+ * @param queue The outbound queue to flush.
+ */
+ protected FlushResult internalFlush(EndpointQueue queue)
+ {
+ return internalFlush(queue, null);
+ }
+
+ /**
+ * Utility method to flush the outbound queue and log any problems.
+ * If a specific client is passed, we need to invoke a client-specific flush.
+ * If the passed client is null, we do a general flush of the queue.
+ * Any exceptions are logged and then rethrown.
+ *
+ * @param queue The outbound queue to flush.
+ * @param client The client to flush for.
+ */
+ protected FlushResult internalFlush(EndpointQueue queue, MessageClient client)
+ {
+ return internalFlush(queue, null, true);
+ }
+
+ /**
+ * Utility method to flush the outbound queue and log any problems.
+ * If a specific client is passed, we need to invoke a client-specific flush.
+ * If the passed client is null, we do a general flush of the queue.
+ * Any exceptions are logged and then rethrown.
+ *
+ * @param queue The outbound queue to flush.
+ * @param client The client to flush for.
+ * @param updateLastUse Whether the last-use timestamp of the FlexClient should
+ * be updated.
+ */
+ protected FlushResult internalFlush(EndpointQueue queue, MessageClient client,
+ boolean updateLastUse)
+ {
+ FlushResult flushResult;
+ try
+ {
+ synchronized (queue)
+ {
+ flushResult = queue.processor.flush(client, queue.messages);
+ shutdownQueue(queue);
+ }
+
+ if (updateLastUse)
+ updateLastUseIfNecessary(flushResult);
+ }
+ catch (RuntimeException e)
+ {
+ if (Log.isError())
+ Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).error("Failed to flush an outbound queue for FlexClient '" + getId() + "'.", e);
+ throw e;
+ }
+ return flushResult;
+ }
+
+ /**
+ * Utility method to flush messages in response to a poll request with
+ * regular and wait poll.
+ *
+ * @param queue The endpoint queue to flush messages for.
+ * @return The flush result with messages, or null if there are no messages.
+ */
+ protected FlushResult internalPoll(EndpointQueue queue)
+ {
+ List<Message> allMessages = new ArrayList<Message>();
+
+ // First, add the previously flushed messages.
+ if (queue.flushedMessagesBetweenPolls != null && queue.flushedMessagesBetweenPolls.size() > 0)
+ {
+ allMessages.addAll(queue.flushedMessagesBetweenPolls);
+ queue.flushedMessagesBetweenPolls.clear();
+ }
+
+ // Then, check for regularly queued messages. We call internalFlush
+ // even if the queue is empty so the queue processor could know
+ // about the incoming poll request regardless.
+ FlushResult internalFlushResult = internalFlush(queue);
+ List<Message> flushedMessages = internalFlushResult.getMessages();
+ if (flushedMessages != null && !flushedMessages.isEmpty())
+ allMessages.addAll(flushedMessages);
+
+ // Schedule a delayed flush, if necessary.
+ int flushWaitTime = internalFlushResult.getNextFlushWaitTimeMillis();
+ if (flushWaitTime > 0)
+ queue.flushTask = new FlexClientScheduledFlushForPoll(queue, flushWaitTime);
+
+ if (allMessages.size() > 0) // Flush, if there are messages.
+ {
+ FlushResult flushResult = new FlushResult();
+ flushResult.setMessages(allMessages);
+ return flushResult;
+ }
+ return null;
+ }
+
+ /**
+ * Notify attribute listeners that an attribute has been added.
+ *
+ * @param name The name of the attribute.
+ *
+ * @param value The new value of the attribute.
+ */
+ protected void notifyAttributeAdded(String name, Object value)
+ {
+ if (attributeListeners != null && !attributeListeners.isEmpty())
+ {
+ FlexClientBindingEvent event = new FlexClientBindingEvent(this, name, value);
+ // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
+ for (FlexClientAttributeListener attribListener : attributeListeners)
+ attribListener.attributeAdded(event);
+ }
+ }
+
+ /**
+ * Notify binding listener that it has been bound to the FlexClient.
+ *
+ * @param name The attribute name.
+ *
+ * @param value The attribute that has been bound.
+ */
+ protected void notifyAttributeBound(String name, Object value)
+ {
+ if ((value != null) && (value instanceof FlexClientBindingListener))
+ {
+ FlexClientBindingEvent bindingEvent = new FlexClientBindingEvent(this, name);
+ ((FlexClientBindingListener)value).valueBound(bindingEvent);
+ }
+ }
+
+ /**
+ * Notify attribute listeners that an attribute has been removed.
+ *
+ * @param name The name of the attribute.
+ *
+ * @param value The previous value of the attribute.
+ */
+ protected void notifyAttributeRemoved(String name, Object value)
+ {
+ if (attributeListeners != null && !attributeListeners.isEmpty())
+ {
+ FlexClientBindingEvent event = new FlexClientBindingEvent(this, name, value);
+ // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
+ for (FlexClientAttributeListener attribListener : attributeListeners)
+ attribListener.attributeRemoved(event);
+ }
+ }
+
+ /**
+ * Notify attribute listeners that an attribute has been replaced.
+ *
+ * @param name The name of the attribute.
+ *
+ * @param value The previous value of the attribute.
+ */
+ protected void notifyAttributeReplaced(String name, Object value)
+ {
+ if (attributeListeners != null && !attributeListeners.isEmpty())
+ {
+ FlexClientBindingEvent event = new FlexClientBindingEvent(this, name, value);
+ // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
+ for (FlexClientAttributeListener attribListener : attributeListeners)
+ attribListener.attributeReplaced(event);
+ }
+ }
+
+ /**
+ * Notify binding listener that it has been unbound from the FlexClient.
+ *
+ * @param name The attribute name.
+ *
+ * @param value The attribute that has been unbound.
+ */
+ protected void notifyAttributeUnbound(String name, Object value)
+ {
+ if ((value != null) && (value instanceof FlexClientBindingListener))
+ {
+ FlexClientBindingEvent bindingEvent = new FlexClientBindingEvent(this, name);
+ ((FlexClientBindingListener)value).valueUnbound(bindingEvent);
+ }
+ }
+
+ /**
+ * Invoked by FlexClientManager after this new FlexClient has been constructed and
+ * is fully configured.
+ */
+ protected void notifyCreated()
+ {
+ if (!createdListeners.isEmpty())
+ {
+ // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
+ for (FlexClientListener createListener : createdListeners)
+ createListener.clientCreated(this);
+ }
+ }
+
+ /**
+ * Utility method used to shutdown endpoint queues accessed via polling channels
+ * that have no more active subscriptions and no more pending outbound messages.
+ *
+ * @param queue The queue to potentially shutdown.
+ * @return true if the queue was cleaned up/removed; otherwise false.
+ */
+ protected boolean shutdownQueue(EndpointQueue queue)
+ {
+ // If no more subscriptions are using the queue and it is empty, shut it down.
+ if (queue.messageClientRefCount == 0 && queue.messages.isEmpty())
+ {
+ outboundQueues.remove(queue.endpointId);
+ // Notify any threads waiting on this queue.
+ synchronized (queue)
+ {
+ queue.notifyAll();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Utility method to throw a not subscribed exception back to the client
+ * if they issue a poll request to an endpoint that they haven't subscribed over.
+ *
+ * This method should not be called when you hold an internal thread lock. It iterates
+ * over all the FlexClients in the current session and will not work if two or more
+ * FlexClients in the same session call it simultaneously.
+ *
+ * @param endpointId The endpoint Id.
+ */
+ protected void throwNotSubscribedException(String endpointId)
+ {
+ // Pre-3.1 versions of the client library did not handle URL session tokens properly
+ // and may incorrectly issue a poll, after subscribing, that does not contain the proper
+ // FlexClient id.
+ // This scenario looks like a poll from a client that is not subscribed, but it is not,
+ // and deserves a more useful error message.
+ // We determine this by checking for an (orphaned) FlexClient instance associated with the
+ // current session that has a subscription established through the target endpoint.
+ List<FlexClient> flexClients = FlexContext.getFlexSession().getFlexClients();
+ for (FlexClient otherClient : flexClients)
+ {
+ if (!otherClient.equals(this))
+ {
+ List<MessageClient> otherSubs = otherClient.getMessageClients();
+ for (MessageClient otherSub : otherSubs)
+ {
+ if (otherSub.getEndpointId().equals(endpointId))
+ {
+ // Throw not-subscribed exception with extra guidance.
+ FlexClientNotSubscribedException e = new FlexClientNotSubscribedException();
+ e.setMessage(10036, new Object[]{endpointId});
+ e.setCode(MessageService.NOT_SUBSCRIBED_CODE);
+ throw e;
+ }
+ }
+ }
+ }
+
+ // Throw general not-subscribed exception.
+ FlexClientNotSubscribedException e = new FlexClientNotSubscribedException();
+ e.setMessage(10028, new Object[]{endpointId});
+ e.setCode(MessageService.NOT_SUBSCRIBED_CODE);
+ throw e;
+ }
+
+ /**
+ * Updates the last-use timestamp if there are messages in the flush result.
+ *
+ * @param flushResult The flush result.
+ */
+ protected void updateLastUseIfNecessary(FlushResult flushResult)
+ {
+ List<Message> messages = flushResult != null ? flushResult.getMessages() : null;
+ if (messages != null && !messages.isEmpty())
+ updateLastUse();
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Private Methods
+ //
+ //--------------------------------------------------------------------------
+
+ private Set<String> getSessionAttributeNames(FlexSession session)
+ {
+ Set<String> attributeNames = new HashSet<String>();
+ Enumeration<String> currentAttributeNames = session.getAttributeNames();
+ while (currentAttributeNames.hasMoreElements())
+ attributeNames.add(currentAttributeNames.nextElement());
+ return attributeNames;
+ }
+
+ private Object getSessionAttributeInCurrentSession(String name)
+ {
+ return FlexContext.getFlexSession().getAttribute(name);
+ }
+
+ private Object getSessionAttributeInOtherSessions(String name)
+ {
+ FlexSession currentSession = FlexContext.getFlexSession();
+ for (FlexSession session : sessions)
+ {
+ if (session == currentSession)
+ continue;
+
+ Object attributeValue = session.getAttribute(name);
+ if (attributeValue != null)
+ return attributeValue;
+ }
+ return null;
+ }
+
+ private void setSessionAttributeInCurrentSession(String name, Object value)
+ {
+ FlexContext.getFlexSession().setAttribute(name, value);
+ }
+
+ private void setSessionAttributeInServletBasedSession(String name, Object value)
+ {
+ for (FlexSession session : sessions)
+ {
+ if (isServletBasedSession(session))
+ {
+ session.setAttribute(name, value);
+ return;
+ }
+ }
+ }
+
+ private boolean isCurrentSessionServletBased()
+ {
+ return isServletBasedSession(FlexContext.getFlexSession());
+ }
+
+ private boolean isServletBasedSession(FlexSession session)
+ {
+ return session instanceof HttpFlexSession;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Inner Classes
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Helper class for handling async poll requests. This class allows the response for an async poll
+ * to be delayed until data arrives to return to the client or the specified wait interval elapses.
+ * Wait timeouts are monitored by the <tt>FlexClientManager</tt> which contains a <tt>TimeoutManager</tt>
+ * instance that is started and stopped during application bootstrap and shutdown. Managing timeouts
+ * locally or statically isn't a good option because they lack a useful shutdown hook that's necessary
+ * in order to close down the timeout manager cleanly.
+ */
+ public class AsyncPollWithTimeout extends TimeoutAbstractObject
+ {
+ /**
+ * Constructor.
+ *
+ * @param flexClient flex client
+ * @param session flex session
+ * @param queue endpoint queue
+ * @param handler poll handler
+ * @param waitIntervalMillis wait interval
+ * @param endpointId endpoint
+ */
+ public AsyncPollWithTimeout(FlexClient flexClient, FlexSession session, EndpointQueue queue, AsyncPollHandler handler, long waitIntervalMillis, String endpointId)
+ {
+ this.flexClient = flexClient;
+ this.session = session;
+ this.queue = queue;
+ this.handler = handler;
+ setTimeoutPeriod(waitIntervalMillis);
+ flexClientManager.monitorAsyncPollTimeout(this);
+ this.endpointId = endpointId;
+ }
+
+ private final FlexClient flexClient;
+
+ /**
+ * Return client.
+ *
+ * @return flex client
+ */
+ public FlexClient getFlexClient()
+ {
+ return flexClient;
+ }
+
+ private final FlexSession session;
+
+ /**
+ * Return session.
+ * @return flex session
+ */
+ public FlexSession getFlexSession()
+ {
+ return session;
+ }
+
+ private final EndpointQueue queue;
+
+ /**
+ * Return endpoint queue.
+ * @return the queue
+ */
+ public EndpointQueue getEndpointQueue()
+ {
+ return queue;
+ }
+
+ private final AsyncPollHandler handler;
+
+ /**
+ * Return handler.
+ * @return the handler
+ */
+ public AsyncPollHandler getHandler()
+ {
+ return handler;
+ }
+
+ private final String endpointId;
+
+ /**
+ * Return endpoint ID.
+ * @return the id
+ */
+ public String getEndpointId()
+ {
+ return endpointId;
+ }
+
+ /**
+ * Trigger a timeout.
+ */
+ public void timeout()
+ {
+ completeAsyncPoll(this, null /* nothing to return */);
+ }
+ }
+
+ /**
+ * Helper class to flush a FlexClient's outbound queue after a specified delay.
+ * Delayed flushes are handled by the <tt>FlexClientManager</tt>
+ * using <tt>TimeoutManager</tt>.
+ */
+ abstract class FlexClientScheduledFlush extends TimeoutAbstractObject
+ {
+ final EndpointQueue queue;
+
+ public FlexClientScheduledFlush(EndpointQueue queue, long waitIntervalMillis)
+ {
+ this.queue = queue;
+ setTimeoutPeriod(waitIntervalMillis);
+ flexClientManager.monitorScheduledFlush(this);
+ }
+
+ abstract void performFlushTask();
+
+ public void timeout()
+ {
+ FlexContext.setThreadLocalFlexClient(FlexClient.this);
+ performFlushTask();
+ FlexContext.setThreadLocalFlexClient(null);
+ }
+ }
+
+ /**
+ * Helper class for push channels to directly flush a FlexClient's outbound
+ * queue after a specified delay.
+ */
+ class FlexClientScheduledFlushForPush extends FlexClientScheduledFlush
+ {
+ public FlexClientScheduledFlushForPush(EndpointQueue queue, long waitIntervalMillis)
+ {
+ super(queue, waitIntervalMillis);
+ }
+
+ @Override
+ void performFlushTask()
+ {
+ synchronized (lock)
+ {
+ synchronized (queue)
+ {
+ directFlush(queue);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Helper class for polling channels to flush a FlexClient's outbound
+ * queue to flushedMessagesBetweenPolls queue after a specified delay.
+ * When the next poll happens, the flushedMessagesBetweenPolls will be
+ * drained first.
+ */
+ class FlexClientScheduledFlushForPoll extends FlexClientScheduledFlush
+ {
+ public FlexClientScheduledFlushForPoll(EndpointQueue queue, long waitIntervalMillis)
+ {
+ super(queue, waitIntervalMillis);
+ }
+
+ @Override
+ void performFlushTask()
+ {
+ synchronized (lock)
+ {
+ // No need to invoke flush if the FlexClient has been invalidated.
+ if (!valid)
+ return;
+
+ // If this invocation is a callback from a flush task, null out the task ref on
+ // the queue to allow a subsequent delayed flush to be scheduled.
+ if (queue.flushTask != null)
+ queue.flushTask = null;
+
+ FlushResult flushResult = internalFlush(queue);
+ if (flushResult == null)
+ return;
+
+ List<Message> messages = flushResult.getMessages();
+ if (messages != null && messages.size() > 0)
+ {
+ if (queue.asyncPoll != null)
+ {
+ completeAsyncPoll(queue.asyncPoll, flushResult);
+ }
+ else
+ {
+ if (queue.flushedMessagesBetweenPolls == null)
+ queue.flushedMessagesBetweenPolls = new ArrayList<Message>();
+ queue.flushedMessagesBetweenPolls.addAll(messages);
+ }
+ }
+
+ // Schedule a delayed flush, if necessary.
+ int flushWaitTime = flushResult.getNextFlushWaitTimeMillis();
+ if (flushWaitTime > 0)
+ queue.flushTask = new FlexClientScheduledFlushForPoll(queue, flushWaitTime);
+ }
+ }
+ }
+
+ /**
+ *
+ * Helper class that stores per-endpoint outbound queue state including:
+ * <ul>
+ * <li>flexClient - The <tt>FlexClient</tt> the queue is used by.</li>
+ * <li>messages - The outbound queue of messages for the endpoint.</li>
+ * <li>flushedMessagesBetweenPolls - Keeps track of flushed (more precisely
+ * drained buffered) messages between polls. A seperate list is needed
+ * from messages list to avoid regular flush handling.</li>
+ * <li>flushedMessagesBetweenPolls - Keeps track of flushed messages between polls.</li>
+ * <li>processor - The processor that handles adding messages to the queue as well as flushing
+ * them to the network.</li>
+ * <li>asyncPoll - The async poll to timeout or callback when messages arrive
+ * (null if the endpoint or session supports direct push).</li>
+ * <li>pushSession - A reference to a pushSession to use for direct writes to the
+ * client (null if the endpoint uses polling or handles push directly).</li>
+ *
+ * <li>flushTask - A reference to a pending flush task that will perform a delayed flush of the queue;
+ * null if no delayed flush has been scheduled.</li>
+ * <li>messageClientRefCount - A reference count of MessageClients subcribed over this endpoint.
+ * Once all MessageClients unsubscribe this queue can be shut down.</li>
+ * <li>avoidBusyPolling - Used to signal poll result generation for the queue to avoid busy polling.</li>
+ * </ul>
+ */
+ public static class EndpointQueue
+ {
+ public FlexClient flexClient;
+ public String endpointId;
+ public Endpoint endpoint;
+ public List<Message> messages;
+ public List<Message> flushedMessagesBetweenPolls;
+ public FlexClientOutboundQueueProcessor processor;
+ public AsyncPollWithTimeout asyncPoll;
+ public boolean waitPoll;
+ public FlexSession pushSession;
+ public TimeoutAbstractObject flushTask;
+ public int messageClientRefCount;
+ public boolean avoidBusyPolling;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientAttributeListener.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/FlexClientAttributeListener.java b/core/src/flex/messaging/client/FlexClientAttributeListener.java
new file mode 100644
index 0000000..2d83d0b
--- /dev/null
+++ b/core/src/flex/messaging/client/FlexClientAttributeListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Interface for FlexClient attribute listeners.
+ */
+public interface FlexClientAttributeListener
+{
+ /**
+ * Callback invoked after an attribute is added to the FlexClient.
+ *
+ * @param event The event containing the associated FlexClient and attribute
+ * information.
+ */
+ void attributeAdded(FlexClientBindingEvent event);
+
+ /**
+ * Callback invoked after an attribute is removed from the FlexClient.
+ *
+ * @param event The event containing the associated FlexClient and attribute
+ * information.
+ */
+ void attributeReplaced(FlexClientBindingEvent event);
+
+ /**
+ * Callback invoked after an attribute has been replaced with a new value.
+ *
+ * @param event The event containing the associated FlexClient and attribute
+ * information.
+ */
+ void attributeRemoved(FlexClientBindingEvent event);
+}