You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by ah...@apache.org on 2014/04/25 07:34:04 UTC

[06/51] [partial] BlazeDS Donation from Adobe Systems Inc

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/MessageBrokerServlet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/MessageBrokerServlet.java b/modules/core/src/flex/messaging/MessageBrokerServlet.java
new file mode 100755
index 0000000..aaa60d8
--- /dev/null
+++ b/modules/core/src/flex/messaging/MessageBrokerServlet.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging;
+
+import flex.management.MBeanLifecycleManager;
+import flex.management.MBeanServerLocatorFactory;
+import flex.messaging.config.ConfigurationManager;
+import flex.messaging.config.FlexConfigurationManager;
+import flex.messaging.config.MessagingConfiguration;
+import flex.messaging.endpoints.Endpoint;
+import flex.messaging.io.SerializationContext;
+import flex.messaging.io.TypeMarshallingContext;
+import flex.messaging.log.HTTPRequestLog;
+import flex.messaging.log.Log;
+import flex.messaging.log.LogCategories;
+import flex.messaging.log.Logger;
+import flex.messaging.log.LoggingHttpServletRequestWrapper;
+import flex.messaging.log.ServletLogTarget;
+import flex.messaging.services.AuthenticationService;
+import flex.messaging.util.ClassUtil;
+import flex.messaging.util.ExceptionUtil;
+import flex.messaging.util.Trace;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.Principal;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The MessageBrokerServlet bootstraps the MessageBroker,
+ * adds endpoints to it, and starts the broker. The servlet
+ * also acts as a facade for all http-based endpoints, in that
+ * the servlet receives the http request and then delegates to
+ * an endpoint that can handle the request's content type. This
+ * does not occur for non-http endpoints, such as the rtmp endpoint.
+ *
+ * @author sneville
+ * @see flex.messaging.MessageBroker
+ * @exclude
+ */
+public class MessageBrokerServlet extends HttpServlet
+{
+    static final long serialVersionUID = -5293855229461612246L;
+
+    public static final String LOG_CATEGORY_STARTUP_BROKER = LogCategories.STARTUP_MESSAGEBROKER;
+    private static final String STRING_UNDEFINED_APPLICATION = "undefined";
+
+    private MessageBroker broker;
+    private HttpFlexSessionProvider httpFlexSessionProvider;
+    private static String FLEXDIR = "/WEB-INF/flex/";
+    private boolean log_errors = false;
+
+    /**
+     * Initializes the servlet in its web container, then creates
+     * the MessageBroker and adds Endpoints and Services to that broker.
+     * This servlet may keep a reference to an endpoint if it needs to
+     * delegate to it in the <code>service</code> method.
+     */
+    public void init(ServletConfig servletConfig) throws ServletException
+    {
+        super.init(servletConfig);
+
+        // allocate thread local variables
+        createThreadLocals();
+
+        // Set the servlet config as thread local
+        FlexContext.setThreadLocalObjects(null, null, null, null, null, servletConfig);
+
+        ServletLogTarget.setServletContext(servletConfig.getServletContext());
+
+        ClassLoader loader = getClassLoader();
+
+        if ("true".equals(servletConfig.getInitParameter("useContextClassLoader")))
+        {
+            loader = Thread.currentThread().getContextClassLoader();
+        }
+
+        // Should we wrap http request for later error logging?
+        log_errors = HTTPRequestLog.init(getServletContext());
+
+        // Start the broker
+        try
+        {
+            // Get the configuration manager
+            ConfigurationManager configManager = loadMessagingConfiguration(servletConfig);
+
+            // Load configuration
+            MessagingConfiguration config = configManager.getMessagingConfiguration(servletConfig);
+
+            // Set up logging system ahead of everything else.
+            config.createLogAndTargets();
+
+            // Create broker.
+            broker = config.createBroker(servletConfig.getInitParameter("messageBrokerId"), loader);
+
+            // Set the servlet config as thread local
+            FlexContext.setThreadLocalObjects(null, null, broker, null, null, servletConfig);
+
+            setupPathResolvers();
+
+            // Set initial servlet context on broker
+            broker.setServletContext(servletConfig.getServletContext());
+
+            Logger logger = Log.getLogger(ConfigurationManager.LOG_CATEGORY);
+            if (Log.isInfo())
+            {
+                logger.info(VersionInfo.buildMessage());
+            }
+
+            // Create endpoints, services, security, and logger on the broker based on configuration
+            config.configureBroker(broker);
+
+            long timeBeforeStartup = 0;
+            if (Log.isDebug())
+            {
+                timeBeforeStartup = System.currentTimeMillis();
+                Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is starting.",
+                        new Object[]{broker.getId()});
+            }
+
+            //initialize the httpSessionToFlexSessionMap
+            synchronized(HttpFlexSession.mapLock)
+            {
+                if (servletConfig.getServletContext().getAttribute(HttpFlexSession.SESSION_MAP) == null)
+                    servletConfig.getServletContext().setAttribute(HttpFlexSession.SESSION_MAP, new ConcurrentHashMap());
+            }
+
+            broker.start();
+
+            if (Log.isDebug())
+            {
+                long timeAfterStartup = System.currentTimeMillis();
+                Long diffMillis = timeAfterStartup - timeBeforeStartup;
+                Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is ready (startup time: '{1}' ms)",
+                        new Object[]{broker.getId(), diffMillis});
+            }
+
+            // Report replaced tokens
+            configManager.reportTokens();
+
+            // Report any unused properties.
+            config.reportUnusedProperties();
+
+            // Setup provider for FlexSessions that wrap underlying J2EE HttpSessions.
+            httpFlexSessionProvider = new HttpFlexSessionProvider();
+            broker.getFlexSessionManager().registerFlexSessionProvider(HttpFlexSession.class, httpFlexSessionProvider);
+
+            // clear the broker and servlet config as this thread is done
+            FlexContext.clearThreadLocalObjects();
+        }
+        catch (Throwable t)
+        {
+            // On any unhandled exception destroy the broker, log it and rethrow.
+            String applicationName = servletConfig.getServletContext().getServletContextName();
+            if (applicationName == null)
+                applicationName = STRING_UNDEFINED_APPLICATION;
+
+            System.err.println("**** MessageBrokerServlet in application '" + applicationName
+                    + "' failed to initialize due to runtime exception: "
+                    + ExceptionUtil.exceptionFollowedByRootCausesToString(t));
+            destroy();
+            // We used to throw  UnavailableException, but Weblogic didn't mark the webapp as failed. See bug FBR-237
+            throw new ServletException(t);
+        }
+    }
+
+    private void setupPathResolvers()
+    {
+        setupExternalPathResolver();
+        setupInternalPathResolver();
+    }
+
+    private void setupExternalPathResolver()
+    {
+        broker.setExternalPathResolver(
+                new MessageBroker.PathResolver()
+                {
+                    public InputStream resolve(String filename) throws FileNotFoundException
+                    {
+                        return new FileInputStream(new File(filename));
+                    }
+                }
+        );
+    }
+
+    private void setupInternalPathResolver()
+    {
+        broker.setInternalPathResolver(
+                new MessageBroker.InternalPathResolver()
+                {
+                    public InputStream resolve(String filename)
+                    {
+                        return getServletContext().getResourceAsStream(FLEXDIR + filename);
+                    }
+                }
+        );
+    }
+
+    private static ConfigurationManager loadMessagingConfiguration(ServletConfig servletConfig)
+    {
+        ConfigurationManager manager = null;
+        Class managerClass;
+        String className;
+
+        // Check for Custom Configuration Manager Specification
+        if (servletConfig != null)
+        {
+            String p = servletConfig.getInitParameter("services.configuration.manager");
+            if (p != null)
+            {
+                className = p.trim();
+                try
+                {
+                    managerClass = ClassUtil.createClass(className);
+                    manager = (ConfigurationManager)managerClass.newInstance();
+                }
+                catch (Throwable t)
+                {
+                    if (Trace.config) // Log is not initialized yet.
+                        Trace.trace("Could not load configuration manager as: " + className);
+                }
+            }
+        }
+
+        if (manager == null)
+        {
+            manager = new FlexConfigurationManager();
+        }
+
+        return manager;
+    }
+
+    /**
+     * Stops all endpoints in the MessageBroker, giving them a chance
+     * to perform any endpoint-specific clean up.
+     */
+    public void destroy()
+    {
+        if (broker != null)
+        {
+            broker.stop();
+            if (broker.isManaged())
+            {
+                MBeanLifecycleManager.unregisterRuntimeMBeans(broker);
+            }
+            // release static thread locals
+            destroyThreadLocals();
+        }
+    }
+
+    /**
+     * Handle an incoming request, and delegate to an endpoint based on
+     * content type, if appropriate. The content type mappings for endpoints
+     * are not externally configurable, and currently the AmfEndpoint
+     * is the only delegate.
+     */
+    public void service(HttpServletRequest req, HttpServletResponse res)
+    {
+        if (log_errors)
+        {
+            // Create a wrapper for the request object so we can save the body content
+            LoggingHttpServletRequestWrapper wrapper = new LoggingHttpServletRequestWrapper(req);
+            req = wrapper;
+
+            try
+            {
+                // Read the body content
+                wrapper.doReadBody();
+            }
+            catch (IOException ignore)
+            {
+                // ignore, the wrapper will preserve what content we were able to read.
+            }
+        }
+
+        try
+        {
+            // Update thread locals
+            broker.initThreadLocals();
+            // Set this first so it is in place for the session creation event.  The
+            // current session is set by the FlexSession stuff right when it is available.
+            // The threadlocal FlexClient is set up during message deserialization in the
+            // MessageBrokerFilter.
+            FlexContext.setThreadLocalObjects(null, null, broker, req, res, getServletConfig());
+
+            HttpFlexSession fs = httpFlexSessionProvider.getOrCreateSession(req);
+            Principal principal;
+            if(FlexContext.isPerClientAuthentication())
+            {
+                principal = FlexContext.getUserPrincipal();
+            }
+            else
+            {
+                principal = fs.getUserPrincipal();
+            }
+
+            if (principal == null && req.getHeader("Authorization") != null)
+            {
+                String encoded = req.getHeader("Authorization");
+                if (encoded.indexOf("Basic") > -1)
+                {
+                    encoded = encoded.substring(6); //Basic.length()+1
+                    try
+                    {
+                        ((AuthenticationService)broker.getService(AuthenticationService.ID)).decodeAndLogin(encoded, broker.getLoginManager());
+                    }
+                    catch (Exception e)
+                    {
+                        if (Log.isDebug())
+                            Log.getLogger(LogCategories.SECURITY).info("Authentication service could not decode and login: " + e.getMessage());
+                    }
+                }
+            }
+
+            String contextPath = req.getContextPath();
+            String pathInfo = req.getPathInfo();
+            String endpointPath = req.getServletPath();
+            if (pathInfo != null)
+                endpointPath = endpointPath + pathInfo;
+
+            Endpoint endpoint;
+            try
+            {
+                endpoint = broker.getEndpoint(endpointPath, contextPath);
+            }
+            catch (MessageException me)
+            {
+                if (Log.isInfo())
+                    Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Received invalid request for endpoint path '{0}'.", new Object[] {endpointPath});
+
+                if (!res.isCommitted())
+                {
+                    try
+                    {
+                        res.sendError(HttpServletResponse.SC_NOT_FOUND);
+                    }
+                    catch (IOException ignore)
+                    {}
+                }
+
+                return;
+            }
+
+            try
+            {
+                if (Log.isInfo())
+                {
+                    Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request.",
+                                                                       new Object[] {endpoint.getId()});
+                }
+                endpoint.service(req, res);
+            }
+            catch (UnsupportedOperationException ue)
+            {
+                if (Log.isInfo())
+                {
+                    Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request for an unsupported operation.",
+                                                                       new Object[] {endpoint.getId()},
+                                                                       ue);
+                }
+
+                if (!res.isCommitted())
+                {
+                    try
+                    {
+                        res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
+                    }
+                    catch (IOException ignore)
+                    {}
+                }
+            }
+        }
+        catch (Throwable t)
+        {
+            // Final resort catch block as recommended by Fortify as a potential System info leak
+            try
+            {
+                Log.getLogger(LogCategories.ENDPOINT_GENERAL).error("Unexpected error encountered in Message Broker servlet", t);
+                res.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+            }
+            catch (IOException ignore)
+            {
+                // ignore
+            }
+
+        }
+        finally
+        {
+            if (log_errors)
+            {
+                String info = (String) req.getAttribute(HTTPRequestLog.HTTP_ERROR_INFO);
+                if (info != null)
+                {
+                    // Log the HttpRequest data
+                    System.out.println("Exception occurred while processing HTTP request: " + info + ", request details logged in " + HTTPRequestLog.getFileName());
+                    HTTPRequestLog.outputRequest(info, req);
+                }
+            }
+
+            FlexContext.clearThreadLocalObjects();
+        }
+    }
+
+    /**
+     * Hook for subclasses to override the class loader to use for loading user defined classes.
+     *
+     * @return the class loader for this class
+     */
+    protected ClassLoader getClassLoader()
+    {
+        return this.getClass().getClassLoader();
+    }
+
+    /** @exclude */
+    // Call ONLY on servlet startup
+    public static void createThreadLocals()
+    {
+        // allocate static thread local objects
+        FlexContext.createThreadLocalObjects();
+        SerializationContext.createThreadLocalObjects();
+        TypeMarshallingContext.createThreadLocalObjects();
+    }
+
+    /** @exclude */
+    // Call ONLY on servlet shutdown
+    protected static void destroyThreadLocals()
+    {
+        // clear static member variables
+        Log.clear();
+        MBeanServerLocatorFactory.clear();
+
+        // Destroy static thread local objects
+        FlexContext.releaseThreadLocalObjects();
+        SerializationContext.releaseThreadLocalObjects();
+        TypeMarshallingContext.releaseThreadLocalObjects();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/MessageClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/MessageClient.java b/modules/core/src/flex/messaging/MessageClient.java
new file mode 100755
index 0000000..e9c8ccc
--- /dev/null
+++ b/modules/core/src/flex/messaging/MessageClient.java
@@ -0,0 +1,1144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import flex.messaging.client.FlexClient;
+import flex.messaging.client.FlexClientOutboundQueueProcessor;
+import flex.messaging.client.OutboundQueueThrottleManager;
+import flex.messaging.config.ThrottleSettings;
+import flex.messaging.config.ThrottleSettings.Policy;
+import flex.messaging.log.LogCategories;
+import flex.messaging.log.Log;
+import flex.messaging.messages.AsyncMessage;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.Message;
+import flex.messaging.services.MessageService;
+import flex.messaging.services.messaging.Subtopic;
+import flex.messaging.services.messaging.selector.JMSSelector;
+import flex.messaging.services.messaging.selector.JMSSelectorException;
+import flex.messaging.util.ExceptionUtil;
+import flex.messaging.util.TimeoutAbstractObject;
+import flex.messaging.util.StringUtils;
+
+/**
+ * Represents a client-side MessageAgent instance.
+ * Currently a server-side MessageClient is only created if its client-side counterpart has subscribed
+ * to a destination for pushed data (e.g. Consumer). Client-side Producers do not result in the creation of
+ * corresponding server-side MessageClient instances.
+ *
+ * Client-side MessageAgents communicate with the server over a Channel that corresponds to a FlexSession.
+ * Server-side MessageClient instances are always created in the context of a FlexSession and when the FlexSession
+ * is invalidated any associated MessageClients are invalidated as well.
+ *
+ * MessageClients may also be timed out on a per-destination basis and this is based on subscription inactivity.
+ * If no messages are pushed to the MessageClient within the destination's subscription timeout period the
+ * MessageClient will be shutdown even if the associated FlexSession is still active and connected.
+ * Per-destination subscription timeout is an optional configuration setting, and should only be used when inactive
+ * subscriptions should be shut down opportunistically to preserve server resources.
+ */
+public class MessageClient extends TimeoutAbstractObject implements Serializable
+{
+    //--------------------------------------------------------------------------
+    //
+    // Public Static Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Log category for MessageClient related messages.
+     */
+    public static final String MESSAGE_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_MESSAGECLIENT;
+
+    //--------------------------------------------------------------------------
+    //
+    // Static Constants
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Serializable to support broadcasting subscription state across the cluster for
+     * optimized message routing.
+     */
+    static final long serialVersionUID = 3730240451524954453L;
+
+    //--------------------------------------------------------------------------
+    //
+    // Static Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * The list of MessageClient created listeners.
+     */
+    private static final CopyOnWriteArrayList<MessageClientListener> createdListeners = new CopyOnWriteArrayList<MessageClientListener>();
+
+    //--------------------------------------------------------------------------
+    //
+    // Static Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Adds a MessageClient created listener.
+     *
+     * @see flex.messaging.MessageClientListener
+     *
+     * @param listener The listener to add.
+     */
+    public static void addMessageClientCreatedListener(MessageClientListener listener)
+    {
+        if (listener != null)
+            createdListeners.addIfAbsent(listener);
+    }
+
+    /**
+     * Removes a MessageClient created listener.
+     *
+     * @see flex.messaging.MessageClientListener
+     *
+     * @param listener The listener to remove.
+     */
+    public static void removeMessageClientCreatedListener(MessageClientListener listener)
+    {
+        if (listener != null)
+            createdListeners.remove(listener);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Private Static Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Utility method.
+     */
+    private static boolean equalStrings(String a, String b)
+    {
+        return a == b || (a != null && a.equals(b));
+    }
+
+    /**
+     * Utility method.
+     */
+    static int compareStrings(String a, String b)
+    {
+        if (a == b)
+            return 0;
+
+        if (a != null && b != null)
+            return a.compareTo(b);
+
+        if (a == null)
+            return -1;
+
+        return 1;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * @exclude
+     * Constructs a new MessageClient for local use.
+     *
+     * @param clientId The clientId for the MessageClient.
+     * @param destination The destination the MessageClient is subscribed to.
+     * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
+     */
+    public MessageClient(Object clientId, Destination destination, String endpointId)
+    {
+        this(clientId, destination, endpointId, true);
+    }
+
+    /**
+     * @exclude
+     * Constructs a new MessageClient.
+     *
+     * @param clientId The clientId for the MessageClient.
+     * @param destination The destination the MessageClient is subscribed to.
+     * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
+     * @param useSession RemoteMessageClient instances should not be associated with a FlexSession (pass false).
+     */
+    public MessageClient(Object clientId, Destination destination, String endpointId, boolean useSession)
+    {
+        valid = true;
+        this.clientId = clientId;
+        this.destination = destination;
+        this.endpointId = endpointId;
+        destinationId = destination.getId();
+        updateLastUse(); // Initialize last use timestamp to construct time.
+
+        /* If this is for a remote server, we do not associate with the session. */
+        if (useSession)
+        {
+            flexSession = FlexContext.getFlexSession();
+            flexSession.registerMessageClient(this);
+
+            flexClient = FlexContext.getFlexClient();
+            flexClient.registerMessageClient(this);
+
+            // SubscriptionManager will notify the created listeners, once
+            // subscription state is setup completely.
+            // notifyCreatedListeners();
+        }
+        else
+        {
+            flexClient = null;
+            flexSession = null;
+            // Use an instance level lock.
+            lock = new Object();
+            // On a remote server we don't notify created listeners.
+        }
+
+        if (Log.isDebug())
+            Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient created with clientId '" + this.clientId + "' for destination '" + destinationId + "'.");
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     *  This flag is set to true when the client channel that this subscription was
+     *  established over is disconnected.
+     *  It supports cleaning up per-endpoint outbound queues maintained by the FlexClient.
+     *  If the client notifies the server that its channel is disconnecting, the FlexClient
+     *  does not need to maintain an outbound queue containing a subscription invalidation
+     *  message for this MessageClient to send to the client.
+     */
+    private volatile boolean clientChannelDisconnected;
+
+    /**
+     *  The clientId for the MessageClient.
+     *  This value is specified by the client directly or is autogenerated on the client.
+     */
+    protected final Object clientId;
+
+    /**
+     * Internal reference to the associated Destination; don't expose this in the public API.
+     */
+    protected final Destination destination;
+
+    /**
+     *  The destination the MessageClient is subscribed to.
+     */
+    protected final String destinationId;
+
+    /**
+     * The set of session destroy listeners to notify when the session is destroyed.
+     */
+    private transient volatile CopyOnWriteArrayList destroyedListeners;
+
+    /**
+     * The Id for the endpoint this MessageClient subscription was created over.
+     */
+    private String endpointId;
+
+    /**
+     * The FlexClient associated with the MessageClient.
+     */
+    private final transient FlexClient flexClient;
+
+    /**
+     * The FlexSession associated with the MessageClient.
+     * Not final because this needs to be reset if the subscription fails over to a new endpoint.
+     */
+    private transient FlexSession flexSession;
+
+    /**
+     * Flag used to break cycles during invalidation.
+     */
+    private boolean invalidating;
+
+    /**
+     * The lock to use to guard all state changes for the MessageClient.
+     */
+    protected Object lock = new Object();
+
+    /**
+     * Flag indicating whether the MessageClient is attempting to notify the remote client of
+     * its invalidation.
+     */
+    private volatile boolean attemptingInvalidationClientNotification;
+
+    /**
+     * A counter used to control invalidation for a MessageClient that has multiple
+     * subscriptions to its destination.
+     * Unsubscribing from one will not invalidate the MessageClient as long as other
+     * subscriptions remain active.
+     */
+    private transient int numReferences;
+
+    /**
+     * A set of all of the subscriptions managed by this message client.
+     */
+    protected final Set<SubscriptionInfo> subscriptions = new CopyOnWriteArraySet<SubscriptionInfo>();
+
+    /**
+     * Flag indicating whether this client is valid or not.
+     */
+    protected boolean valid;
+
+    /**
+     * Flag that indicates whether the MessageClient has a per-destination subscription timeout.
+     * If false, the MessageClient will remain valid until its associated FlexSession is invalidated.
+     */
+    private volatile boolean willTimeout;
+
+    /**
+     * Has anyone explicitly registered this message client.  This indicates that
+     * there is a reference to this MessageClient which is not an explicit subscription.
+     * This is a hook for FDMS and other adapters which want to use pushMessageToClients
+     * with clientIds but that do not want the subscription manager to manage subscriptions
+     * for them.
+     */
+    private volatile boolean registered = false;
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Returns the clientId for the MessageClient.
+     *
+     * @return The clientId for the MessageClient.
+     */
+    public Object getClientId()
+    {
+        return clientId; // Field is final; no need to sync.
+    }
+
+    /**
+     * Returns the destination the MessageClient is subscribed to.
+     *
+     * @return The destination the MessageClient is subscribed to.
+     */
+    public Destination getDestination()
+    {
+        return destination; // Field is final; no need to sync.
+    }
+
+    /**
+     * Returns the id of the destination the MessageClient is subscribed to.
+     *
+     * @return The id of the destination the MessageClient is subscribed to.
+     */
+    public String getDestinationId()
+    {
+        return destinationId; // Field is final; no need to sync.
+    }
+
+    /**
+     * Returns the Id for the endpoint the MessageClient subscription was created over.
+     *
+     * @return The Id for the endpoint the MessageClient subscription was created over.
+     */
+    public String getEndpointId()
+    {
+        return endpointId; // Field is final; no need to sync.
+    }
+
+    /**
+     * Returns the FlexClient associated with this MessageClient.
+     *
+     * @return The FlexClient assocaited with this MessageClient.
+     */
+    public FlexClient getFlexClient()
+    {
+        return flexClient; // Field is final; no need to sync.
+    }
+
+    /**
+     * Returns the FlexSession associated with this MessageClient.
+     *
+     * @return The FlexSession associated with this MessageClient.
+     */
+    public FlexSession getFlexSession()
+    {
+        synchronized (lock)
+        {
+            return flexSession;
+        }
+    }
+
+    /**
+     * Returns the number of subscriptions associated with this MessageClient.
+     *
+     * @return The number of subscriptions associated with this MessageClient.
+     */
+    public int getSubscriptionCount()
+    {
+        int count;
+
+        synchronized (lock)
+        {
+            count = subscriptions != null? subscriptions.size() : 0;
+        }
+
+        return count;
+    }
+
+    /**
+     * @exclude
+     * This is used for FlexClient outbound queue management. When a MessageClient is invalidated
+     * if it is attempting to notify the client, then we must leave the outbound queue containing
+     * the notification in place. Otherwise, any messages queued for the subscription may be
+     * removed from the queue and possibly shut down immediately.
+     */
+    public boolean isAttemptingInvalidationClientNotification()
+    {
+        return attemptingInvalidationClientNotification;
+    }
+
+    /**
+     * @exclude
+     * This is set to true when the MessageClient is invalidated due to the client
+     * channel the subscription was established over disconnecting.
+     * It allows the FlexClient class to cleanup the outbound queue for the channel's
+     * corresponding server endpoint for the remote client, because we know that no
+     * currently queued messages need to be retained for delivery.
+     */
+    public void setClientChannelDisconnected(boolean value)
+    {
+        clientChannelDisconnected = value;
+    }
+
+    /**
+     * @exclude
+     */
+    public boolean isClientChannelDisconnected()
+    {
+        return clientChannelDisconnected;
+    }
+
+    /**
+     * @exclude
+     * This is true when some code other than the SubscriptionManager
+     * is maintaining subscriptions for this message client.  It ensures
+     * that we have this MessageClient kept around until the session
+     * expires.
+     */
+    public void setRegistered(boolean reg)
+    {
+        registered = reg;
+    }
+
+    /**
+     * @exclude
+     */
+    public boolean isRegistered()
+    {
+        return registered;
+    }
+
+    /**
+     * Adds a MessageClient destroy listener.
+     *
+     * @see flex.messaging.MessageClientListener
+     *
+     * @param listener The listener to add.
+     */
+    public void addMessageClientDestroyedListener(MessageClientListener listener)
+    {
+        if (listener != null)
+        {
+            checkValid();
+
+            if (destroyedListeners == null)
+            {
+                synchronized (lock)
+                {
+                    if (destroyedListeners == null)
+                        destroyedListeners = new CopyOnWriteArrayList();
+                }
+            }
+
+            destroyedListeners.addIfAbsent(listener);
+        }
+    }
+
+    /**
+     * Removes a MessageClient destroyed listener.
+     *
+     * @see flex.messaging.MessageClientListener
+     *
+     * @param listener The listener to remove.
+     */
+    public void removeMessageClientDestroyedListener(MessageClientListener listener)
+    {
+        // No need to check validity; removing a listener is always ok.
+        if (listener != null && destroyedListeners != null)
+            destroyedListeners.remove(listener);
+    }
+
+    /**
+     * @exclude
+     * Adds a subscription to the subscription set for this MessageClient.
+     *
+     * @param selector The selector expression used for the subscription.
+     * @param subtopic The subtopic used for the subscription.
+     * @param maxFrequency The maximum number of messages the client wants to
+     * receive per second (0 disables this limit).
+     */
+    public void addSubscription(String selector, String subtopic, int maxFrequency)
+    {
+        synchronized (lock)
+        {
+            checkValid();
+
+            incrementReferences();
+
+            // Create and add the subscription to the subscriptions set.
+            SubscriptionInfo si = new SubscriptionInfo(selector, subtopic, maxFrequency);
+            subscriptions.add(si);
+
+            registerSubscriptionWithThrottleManager(si);
+        }
+    }
+
+    /**
+     * @exclude
+     * Registers  the subscription with the outbound queue processor's throttle
+     * manager, if one exists.
+     *
+     * @param si The subscription info object.
+     */
+    public void registerSubscriptionWithThrottleManager(SubscriptionInfo si)
+    {
+        // Register the destination that will setup client level outbound throttling.
+        ThrottleSettings ts = destination.getNetworkSettings().getThrottleSettings();
+        if (ts.getOutboundPolicy() != Policy.NONE && (ts.isOutboundClientThrottleEnabled() || si.maxFrequency > 0))
+        {
+            // Setup the client level outbound throttling, and register the destination
+            // only if the policy is not NONE, and a throttling limit is specified
+            // either at the destination or by consumer.
+            OutboundQueueThrottleManager throttleManager = getThrottleManager(true);
+            if (throttleManager != null)
+                throttleManager.registerDestination(destinationId, ts.getOutgoingClientFrequency(), ts.getOutboundPolicy());
+        }
+        else if (si.maxFrequency > 0) // Let the client know that maxFrequency will be ignored.
+        {
+            if (Log.isWarn())
+                Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).warn("MessageClient with clientId '"
+                        + clientId + "' for destination '" + destinationId
+                        + "' specified a maxFrequency value of '" + si.maxFrequency
+                        + "' but the destination does not define a throttling policy. This value will be ignored.");
+        }
+
+        // Now, register the subscription.
+        OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
+        if (throttleManager != null)
+            throttleManager.registerSubscription(destinationId, si);
+    }
+
+    /**
+     * @exclude
+     * Removes a subscription from the subscription set for this MessageClient.
+     *
+     * @param selector The selector expression for the subscription.
+     * @param subtopic The subtopic for the subscription.
+     * @return true if no subscriptions remain for this MessageClient; otherwise false.
+     */
+    public boolean removeSubscription(String selector, String subtopic)
+    {
+        synchronized (lock)
+        {
+            SubscriptionInfo si = new SubscriptionInfo(selector, subtopic);
+            if (subscriptions.remove(si))
+            {
+                unregisterSubscriptionWithThrottleManager(si);
+                return decrementReferences();
+            }
+            else if (Log.isError())
+            {
+                Log.getLogger(MessageService.LOG_CATEGORY).error("Error - unable to find subscription to remove for MessageClient: "
+                        + clientId + " selector: " + selector + " subtopic: " + subtopic);
+            }
+            return numReferences == 0;
+        }
+    }
+
+    /**
+     * @exclude
+     * We use the same MessageClient for more than one subscription with different
+     * selection criteria.  This tracks the number of subscriptions that are active
+     * so that we know when we are finished.
+     */
+    public void incrementReferences()
+    {
+        synchronized (lock)
+        {
+            numReferences++;
+        }
+    }
+
+    /**
+     * @exclude
+     * Decrements the numReferences variable and returns true if this was the last reference.
+     */
+    public boolean decrementReferences()
+    {
+        synchronized (lock)
+        {
+            if (--numReferences == 0)
+            {
+                cancelTimeout();
+                if (destination instanceof MessageDestination)
+                {
+                    MessageDestination msgDestination = (MessageDestination)destination;
+                    if (msgDestination.getThrottleManager() != null)
+                        msgDestination.getThrottleManager().removeClientThrottleMark(clientId);
+                }
+                return true;
+            }
+            return false;
+        }
+    }
+
+    /**
+     * @exclude
+     * Invoked by SubscriptionManager once the subscription state is setup completely
+     * for the MessageClient..
+     */
+    public void notifyCreatedListeners()
+    {
+        // Notify MessageClient created listeners.
+        if (!createdListeners.isEmpty())
+        {
+            // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
+            for (Iterator iter = createdListeners.iterator(); iter.hasNext();)
+                ((MessageClientListener)iter.next()).messageClientCreated(this);
+        }
+    }
+
+    /**
+     * @exclude
+     * Invoked by SubscriptionManager while handling a subscribe request.
+     * If the request is updating an existing subscription the 'push' state in the associated FlexClient
+     * may need to be updated to ensure that the correct endpoint is used for this subscription.
+     *
+     * @param newEndpointId The id for the new endpoint that the subscription may have failed over to.
+     */
+    public void resetEndpoint(String newEndpointId)
+    {
+        String oldEndpointId = null;
+        FlexSession oldSession = null;
+        FlexSession newSession = FlexContext.getFlexSession();
+        synchronized (lock)
+        {
+            // If anything is null, or nothing has changed, no need for a reset.
+            if (endpointId == null || newEndpointId == null || flexSession == null || newSession == null || (endpointId.equals(newEndpointId) && flexSession.equals(newSession)))
+                return;
+
+            oldEndpointId = endpointId;
+            endpointId = newEndpointId;
+
+            oldSession = flexSession;
+            flexSession = newSession;
+        }
+
+        // Unregister in order to reset the proper push settings in the re-registration below once the session association has been patched.
+        if (flexClient != null)
+            flexClient.unregisterMessageClient(this);
+
+        // Clear out any reference to this subscription that the previously associated session has.
+        if (oldSession != null)
+            oldSession.unregisterMessageClient(this);
+
+        // Associate the current session with this subscription.
+        if (flexSession != null)
+            flexSession.registerMessageClient(this);
+
+        // Reset proper push settings.
+        if (flexClient != null)
+            flexClient.registerMessageClient(this);
+
+        if (Log.isDebug())
+        {
+            String msg = "MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been reset as a result of a resubscribe.";
+            if (oldEndpointId != null && !oldEndpointId.equals(newEndpointId))
+                msg += " Endpoint change [" + oldEndpointId + " -> " + newEndpointId + "]";
+            if ((oldSession != null) && (newSession != null) && (oldSession != newSession)) // Test identity.
+                msg += " FlexSession change [" + oldSession.getClass().getName() + ":" + oldSession.getId() + " -> " + newSession.getClass().getName() + ":" + newSession.getId() + "]";
+
+            Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug(msg);
+        }
+    }
+
+    /**
+     * @exclude
+     * Used to test whether this client should receive this message
+     * based on the list of subscriptions we have recorded for it.
+     * It must match both the subtopic and the selector expression.
+     * Usually this is done by the subscription manager - this logic is
+     * only here to maintain api compatibility with one of the variants
+     * of the pushMessageToClients which has subscriberIds and an evalSelector
+     * property.
+     *
+     * @param message The message to test.
+     */
+    public boolean testMessage(Message message, MessageDestination destination)
+    {
+        String subtopic = (String) message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);
+        String subtopicSeparator = destination.getServerSettings().getSubtopicSeparator();
+        synchronized (lock)
+        {
+            for (SubscriptionInfo si : subscriptions)
+            {
+                if (si.matches(message, subtopic, subtopicSeparator))
+                    return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Returns true if the MessageClient is valid; false if it has been invalidated.
+     *
+     * @return true if the MessageClient is valid; otherwise false.
+     */
+    public boolean isValid()
+    {
+        synchronized (lock)
+        {
+            return valid;
+        }
+    }
+    
+    /**
+     * Invalidates the MessageClient.
+     */
+    public void invalidate()
+    {
+        invalidate(false /* don't attempt to notify the client */);
+    }
+
+    /**
+     * Invalidates the MessageClient, and optionally attempts to notify the client that
+     * this subscription has been invalidated.
+     * This overload is used when a subscription is timed out while the client is still
+     * actively connected to the server but should also be used by any custom code on the server
+     * that invalidates MessageClients but wishes to notify the client cleanly.
+     *
+     * @param notifyClient <code>true</code> to notify the client that its subscription has been
+     *                     invalidated.
+     */
+    public void invalidate(boolean notifyClient)
+    {
+        synchronized (lock)
+        {
+            if (!valid || invalidating)
+                return; // Already shutting down.
+
+            invalidating = true; // This thread gets to shut the MessageClient down.
+            cancelTimeout();
+        }
+
+        // Record whether we're attempting to notify the client or not.
+        attemptingInvalidationClientNotification = notifyClient;
+
+        // Build a subscription invalidation message and push to the client if it is still valid.
+        if (notifyClient && flexClient != null && flexClient.isValid())
+        {
+            CommandMessage msg = new CommandMessage();
+            msg.setDestination(destination.getId());
+            msg.setClientId(clientId);
+            msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION);
+            Set subscriberIds = new TreeSet();
+            subscriberIds.add(clientId);
+            try
+            {
+                if (destination instanceof MessageDestination)
+                {
+                    MessageDestination msgDestination = (MessageDestination)destination;
+                    ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, msg, false /* don't eval selector */);
+                }
+            }
+            catch (MessageException ignore) {}
+        }
+
+        // Notify messageClientDestroyed listeners that we're being invalidated.
+        if (destroyedListeners != null && !destroyedListeners.isEmpty())
+        {
+            for (Iterator iter = destroyedListeners.iterator(); iter.hasNext();)
+            {
+                ((MessageClientListener)iter.next()).messageClientDestroyed(this);
+            }
+            destroyedListeners.clear();
+        }
+
+        // And generate unsubscribe messages for all of the MessageClient's subscriptions and
+        // route them to the destination this MessageClient is subscribed to.
+        // The reason we send a message to the service rather than just going straight to the SubscriptionManager
+        // is that some adapters manage their own subscription state (i.e. JMS) in addition to us keeping track of
+        // things with our SubscriptionManager.
+        ArrayList<CommandMessage> unsubMessages = new ArrayList<CommandMessage>();
+        synchronized (lock)
+        {
+            for (SubscriptionInfo subInfo : subscriptions)
+            {
+                CommandMessage unsubMessage = new CommandMessage();
+                unsubMessage.setDestination(destination.getId());
+                unsubMessage.setClientId(clientId);
+                unsubMessage.setOperation(CommandMessage.UNSUBSCRIBE_OPERATION);
+                unsubMessage.setHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER, Boolean.TRUE);
+                unsubMessage.setHeader(CommandMessage.SELECTOR_HEADER, subInfo.selector);
+                unsubMessage.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subInfo.subtopic);
+                unsubMessages.add(unsubMessage);
+            }
+        }
+        // Release the lock and send the unsub messages.
+        for (CommandMessage unsubMessage : unsubMessages)
+        {
+            try
+            {
+                destination.getService().serviceCommand(unsubMessage);
+            }
+            catch (MessageException me)
+            {
+                if (Log.isDebug())
+                    Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient: " + getClientId() + " issued an unsubscribe message during invalidation that was not processed but will continue with invalidation. Reason: " + ExceptionUtil.toString(me));
+            }
+        }
+
+        synchronized (lock)
+        {
+            // If we didn't clean up all subscriptions log an error and continue with shutdown.
+            int remainingSubscriptionCount = subscriptions.size();
+            if (remainingSubscriptionCount > 0 && Log.isError())
+                Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).error("MessageClient: " + getClientId() + " failed to remove " + remainingSubscriptionCount + " subscription(s) during invalidation");
+        }
+
+        // If someone registered this message client, invalidating it will free
+        // their reference which will typically also remove this message client.
+        if (registered && destination instanceof MessageDestination)
+            ((MessageDestination)destination).getSubscriptionManager().releaseMessageClient(this);
+
+        synchronized (lock)
+        {
+            valid = false;
+            invalidating = false;
+        }
+
+        if (Log.isDebug())
+            Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been invalidated.");
+    }
+
+    /**
+     * Pushes the supplied message and then invalidates the MessageClient.
+     *
+     * @param message The message to push to the client before invalidating.
+     * When message is null, MessageClient is invalidated silently.
+     */
+    public void invalidate(Message message)
+    {
+        if (message != null)
+        {
+            message.setDestination(destination.getId());
+            message.setClientId(clientId);
+
+            Set subscriberIds = new TreeSet();
+            subscriberIds.add(clientId);
+            try
+            {
+                if (destination instanceof MessageDestination)
+                {
+                    MessageDestination msgDestination = (MessageDestination)destination;
+                    ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, message, false /* don't eval selector */);
+                }
+            }
+            catch (MessageException ignore) {}
+
+            invalidate(true /* attempt to notify remote client */);
+        }
+        else
+        {
+            invalidate();
+        }
+    }
+
+    /**
+     * @exclude
+     * Compares this MessageClient to the specified object. The result is true if
+     * the argument is not null and is a MessageClient instance with a matching
+     * clientId value.
+     *
+     * @param o The object to compare this MessageClient to.
+     * @return true if the MessageClient is equal; otherwise false.
+     */
+    public boolean equals(Object o)
+    {
+        if (o instanceof MessageClient)
+        {
+            MessageClient c = (MessageClient) o;
+            if (c != null && c.getClientId().equals(clientId))
+                return true;
+        }
+        return false;
+    }
+
+    /**
+     * @exclude
+     * Returns the hash code for this MessageClient. The returned value is
+     * the hash code for the MessageClient's clientId property.
+     *
+     * @return The hash code value for this MessageClient.
+     */
+    @Override
+    public int hashCode()
+    {
+        return getClientId().hashCode();
+    }
+
+    /**
+     * @exclude
+     * The String representation of this MessageClient is returned (its clientId value).
+     *
+     * @return The clientId value for this MessageClient.
+     */
+    @Override
+    public String toString()
+    {
+        return String.valueOf(clientId);
+    }
+
+    //----------------------------------
+    //  TimeoutAbstractObject overrides
+    //----------------------------------
+
+    /**
+     * @exclude
+     * Implements TimeoutCapable.
+     * This method returns the timeout value configured for the MessageClient's destination.
+     */
+    @Override
+    public long getTimeoutPeriod()
+    {
+        return (destination instanceof MessageDestination) ? 
+                ((MessageDestination)destination).getSubscriptionManager().getSubscriptionTimeoutMillis() : 0;
+    }
+
+    /**
+     * @exclude
+     * Implements TimeoutCapable.
+     * This method is invoked when the MessageClient has timed out and it
+     * invalidates the MessageClient.
+     */
+    public void timeout()
+    {
+        invalidate(true /* notify client */);
+    }
+
+    /**
+     * @exclude
+     * Returns true if a timeout task is running for this MessageClient.
+     */
+    public boolean isTimingOut()
+    {
+        return willTimeout;
+    }
+
+    /**
+     * @exclude
+     * Records whether a timeout task is running for this MessageClient.
+     */
+    public void setTimingOut(boolean value)
+    {
+        willTimeout = value;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Private Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Utility method that tests validity and throws an exception if the instance
+     * has been invalidated.
+     */
+    private void checkValid()
+    {
+        synchronized (lock)
+        {
+            if (!valid)
+            {
+                throw new RuntimeException("MessageClient has been invalidated."); // TODO - localize
+            }
+        }
+    }
+
+    private OutboundQueueThrottleManager getThrottleManager(boolean create)
+    {
+        if (flexClient != null)
+        {
+            FlexClientOutboundQueueProcessor processor = flexClient.getOutboundQueueProcessor(endpointId);
+            if (processor != null)
+                return create? processor.getOrCreateOutboundQueueThrottleManager() : processor.getOutboundQueueThrottleManager();
+        }
+        return null;
+    }
+
+    private void unregisterSubscriptionWithThrottleManager(SubscriptionInfo si)
+    {
+        OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
+        if (throttleManager != null)
+            throttleManager.unregisterSubscription(destinationId, si);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Nested Classes
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Represents a MessageClient's subscription to a destination.
+     * It captures the optional selector expression and subtopic for the
+     * subscription.
+     */
+    public static class SubscriptionInfo implements Comparable
+    {
+        public String selector, subtopic;
+        public int maxFrequency; // maxFrequency per subscription. Not used in BlazeDS.
+
+        public SubscriptionInfo(String sel, String sub)
+        {
+            this(sel, sub, 0);
+        }
+
+        public SubscriptionInfo(String sel, String sub, int maxFrequency)
+        {
+            this.selector = sel;
+            this.subtopic = sub;
+            this.maxFrequency = maxFrequency;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (o instanceof SubscriptionInfo)
+            {
+                SubscriptionInfo other = (SubscriptionInfo) o;
+                return equalStrings(other.selector, selector) &&
+                       equalStrings(other.subtopic, subtopic);
+            }
+            return false;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return (selector == null ? 0 : selector.hashCode()) +
+                   (subtopic == null ? 1 : subtopic.hashCode());
+        }
+
+        /**
+         * Compares the two subscription infos (being careful to
+         * ensure we compare in a consistent way if the arguments
+         * are switched).
+         * @param o the object to compare
+         * @return int the compare result
+         */
+        public int compareTo(Object o)
+        {
+            SubscriptionInfo other = (SubscriptionInfo) o;
+            int result;
+
+            if ((result = compareStrings(other.selector, selector)) != 0)
+                return result;
+            else if ((result = compareStrings(other.subtopic, subtopic)) != 0)
+                return result;
+
+            return 0;
+        }
+
+        /**
+         * Check whether the message matches with selected subtopic.
+         * @param message current message
+         * @param subtopicToMatch subtopc string
+         * @param subtopicSeparator suptopic separator 
+         * @return true if the message matches the subtopic
+         */
+        public boolean matches(Message message, String subtopicToMatch, String subtopicSeparator)
+        {
+            if ((subtopicToMatch == null && subtopic != null) || (subtopicToMatch != null && subtopic == null))
+                return false; // If either defines a subtopic, they both must define one.
+
+            // If both define a subtopic, they must match.
+            if (subtopicToMatch != null && subtopic != null)
+            {
+                Subtopic consumerSubtopic = new Subtopic(subtopic, subtopicSeparator);
+                Subtopic messageSubtopic = new Subtopic(subtopicToMatch, subtopicSeparator);
+                if (!consumerSubtopic.matches(messageSubtopic))
+                    return false; // Not a match.
+            }
+
+            if (selector == null)
+                return true;
+
+            JMSSelector jmsSelector = new JMSSelector(selector);
+            try
+            {
+                if (jmsSelector.match(message))
+                    return true;
+            }
+            catch (JMSSelectorException jmse)
+            {
+                // Log a warning for this client's selector and continue
+                if (Log.isWarn())
+                {
+                    Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error processing message selector: " +
+                         jmse.toString() + StringUtils.NEWLINE +
+                         "  incomingMessage: " + message + StringUtils.NEWLINE +
+                         "  selector: " + selector + StringUtils.NEWLINE);
+                }
+            }
+            return false;
+        }
+
+        /**
+         * Returns a String representation of the subscription info.
+         * @return String the string representation of the subscription info
+         */
+        public String toString()
+        {
+            StringBuffer sb = new StringBuffer();
+            sb.append("Subtopic: " + subtopic + StringUtils.NEWLINE);
+            sb.append("Selector: " + selector + StringUtils.NEWLINE);
+            if (maxFrequency > 0)
+                sb.append("maxFrequency: " + maxFrequency);
+            return sb.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/MessageClientListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/MessageClientListener.java b/modules/core/src/flex/messaging/MessageClientListener.java
new file mode 100755
index 0000000..e77b31d
--- /dev/null
+++ b/modules/core/src/flex/messaging/MessageClientListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging;
+
+/**
+ * Interface to be notified when a MessageClient is created or destroyed. Implementations of this interface
+ * may add themselves as listeners statically via <code>MessageClient.addMessageClientCreatedListener()</code>.
+ * To listen for MessageClient destruction, the implementation class instance must add itself as a listener to
+ * a specific MessageClient instance via the <code>addMessageClientDestroyedListener()</code> method.
+ */
+public interface MessageClientListener 
+{
+    /**
+     * Notification that a MessageClient was created.
+     * 
+     * @param messageClient The MessageClient that was created.
+     */
+    void messageClientCreated(MessageClient messageClient);
+    
+    /**
+     * Notification that a MessageClient is about to be destroyed.
+     * 
+     * @param messageClient The MessageClient that will be destroyed.
+     */
+    void messageClientDestroyed(MessageClient messageClient);
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/MessageDestination.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/MessageDestination.java b/modules/core/src/flex/messaging/MessageDestination.java
new file mode 100755
index 0000000..5552b7a
--- /dev/null
+++ b/modules/core/src/flex/messaging/MessageDestination.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging;
+
+import flex.management.runtime.messaging.MessageDestinationControl;
+import flex.management.runtime.messaging.services.messaging.SubscriptionManagerControl;
+import flex.management.runtime.messaging.services.messaging.ThrottleManagerControl;
+import flex.messaging.config.ConfigurationConstants;
+import flex.messaging.config.ConfigurationException;
+import flex.messaging.config.DestinationSettings;
+import flex.messaging.config.ThrottleSettings;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.config.NetworkSettings;
+import flex.messaging.config.ServerSettings;
+import flex.messaging.config.ThrottleSettings.Policy;
+import flex.messaging.log.LogCategories;
+import flex.messaging.services.MessageService;
+import flex.messaging.services.Service;
+import flex.messaging.services.messaging.SubscriptionManager;
+import flex.messaging.services.messaging.RemoteSubscriptionManager;
+import flex.messaging.services.messaging.ThrottleManager;
+import flex.messaging.services.messaging.MessagingConstants;
+import flex.messaging.util.ClassUtil;
+
+/**
+ * A logical reference to a MessageDestination.
+ */
+public class MessageDestination extends FactoryDestination
+{
+    static final long serialVersionUID = -2016911808141319012L;
+
+    /** Log category for <code>MessageDestination</code>.*/
+    public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE;
+
+    // Errors
+    private static final int UNSUPPORTED_POLICY = 10124;
+
+    // Destination properties
+    private transient ServerSettings serverSettings;
+
+    // Destination internal
+    private transient SubscriptionManager subscriptionManager;
+    private transient RemoteSubscriptionManager remoteSubscriptionManager;
+    private transient ThrottleManager throttleManager;
+
+    private transient MessageDestinationControl controller;
+
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs an unmanaged <code>MessageDestination</code> instance.
+     */
+    public MessageDestination()
+    {
+        this(false);
+    }
+
+    /**
+     * Constructs a <code>MessageDestination</code> with the indicated management.
+     *
+     * @param enableManagement <code>true</code> if the <code>MessageDestination</code>
+     * is manageable; otherwise <code>false</code>.
+     */
+    public MessageDestination(boolean enableManagement)
+    {
+        super(enableManagement);
+
+        serverSettings = new ServerSettings();
+
+        // Managers
+        subscriptionManager = new SubscriptionManager(this);
+        remoteSubscriptionManager = new RemoteSubscriptionManager(this);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Initialize, validate, start, and stop methods.
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Initializes the <code>MessageDestination</code> with the properties.
+     * If subclasses override, they must call <code>super.initialize()</code>.
+     *
+     * @param id The id of the destination.
+     * @param properties Properties for the <code>MessageDestination</code>.
+     */
+    @Override
+    public void initialize(String id, ConfigMap properties)
+    {
+        super.initialize(id, properties);
+
+        if (properties == null || properties.size() == 0)
+            return;
+
+        // Network properties
+        network(properties);
+
+        // Server properties
+        server(properties);
+    }
+
+    /**
+     * Sets up the throttle manager before it starts.
+     */
+    @Override
+    public void start()
+    {
+        // Create the throttle manager, only if needed.
+        if (networkSettings.getThrottleSettings() != null)
+        {
+            ThrottleSettings settings = networkSettings.getThrottleSettings();
+            if (settings.isClientThrottleEnabled() || settings.isDestinationThrottleEnabled())
+            {
+                settings.setDestinationName(getId());
+                throttleManager = createThrottleManager();
+                throttleManager.setThrottleSettings(settings);
+                throttleManager.start();
+            }
+        }
+        super.start();
+    }
+
+    /**
+     * Stops the subscription, remote subscription, and throttle managers and
+     * then calls super class's stop.
+     */
+    @Override
+    public void stop()
+    {
+        if (isStarted())
+        {
+            subscriptionManager.stop();
+            remoteSubscriptionManager.stop();
+            if (throttleManager != null)
+                throttleManager.stop();
+        }
+        super.stop();
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Getters and Setters for Destination properties
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Sets the <code>NetworkSettings</code> of the <code>MessageDestination</code>.
+     *
+     * @param networkSettings The <code>NetworkSettings</code> of the <code>MessageDestination</code>
+     */
+    @Override
+    public void setNetworkSettings(NetworkSettings networkSettings)
+    {
+        super.setNetworkSettings(networkSettings);
+
+        // Set the subscription manager settings if needed.
+        if (networkSettings.getSubscriptionTimeoutMinutes() > 0)
+        {
+            long subscriptionTimeoutMillis = networkSettings.getSubscriptionTimeoutMinutes() * 60L * 1000L; // Convert to millis.
+            subscriptionManager.setSubscriptionTimeoutMillis(subscriptionTimeoutMillis);
+        }
+    }
+
+    /**
+     * Returns the <code>ServerSettings</code> of the <code>MessageDestination</code>.
+     *
+     * @return The <code>ServerSettings</code> of the <code>MessageDestination</code>.
+     */
+    public ServerSettings getServerSettings()
+    {
+        return serverSettings;
+    }
+
+    /**
+     * Sets the <code>ServerSettings</code> of the <code>MessageDestination</code>.
+     *
+     * @param serverSettings The <code>ServerSettings</code> of the <code>MessageDestination</code>
+     */
+    public void setServerSettings(ServerSettings serverSettings)
+    {
+        this.serverSettings = serverSettings;
+    }
+
+    /**
+     * Casts the <code>Service</code> into <code>MessageService</code>
+     * and calls super.setService.
+     *
+     * @param service The <code>Service</code> managing this <code>Destination</code>.
+     */
+    @Override
+    public void setService(Service service)
+    {
+        MessageService messageService = (MessageService)service;
+        super.setService(messageService);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Other Public Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * @exclude
+     * Returns a <tt>ConfigMap</tt> of destination properties that the client
+     * needs. This includes properties from <code>super{@link #describeDestination(boolean)}</code>
+     * and it also includes outbound throttling policy that the edge server might need.
+     *
+     * @param onlyReliable Determines whether only reliable destination configuration should be returned.
+     * @return A <tt>ConfigMap</tt> of destination properties that the client needs.
+     */
+    @Override
+    public ConfigMap describeDestination(boolean onlyReliable)
+    {
+        ConfigMap destinationConfig = super.describeDestination(onlyReliable);
+        if (destinationConfig == null)
+            return null;
+
+        if (throttleManager == null)
+            return destinationConfig;
+
+        Policy outboundPolicy = throttleManager.getOutboundPolicy();
+        if (outboundPolicy == null || outboundPolicy == Policy.NONE)
+            return destinationConfig;
+
+        // Add the outbound throttle policy to network properties section as appropriate.
+        ConfigMap properties = destinationConfig.getPropertyAsMap(ConfigurationConstants.PROPERTIES_ELEMENT, null);
+        if (properties == null)
+        {
+            properties = new ConfigMap();
+            destinationConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties);
+        }
+
+        ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null);
+        if (network == null)
+        {
+            network = new ConfigMap();
+            properties.addProperty(NetworkSettings.NETWORK_ELEMENT, network);
+        }
+
+        ConfigMap throttleOutbound = new ConfigMap();
+        throttleOutbound.addProperty(ThrottleSettings.ELEMENT_POLICY, throttleManager.getOutboundPolicy().toString());
+        network.addProperty(ThrottleSettings.ELEMENT_OUTBOUND, throttleOutbound);
+
+        return destinationConfig;
+    }
+
+    /** @exclude */
+    public SubscriptionManager getSubscriptionManager()
+    {
+        return subscriptionManager;
+    }
+
+    /** @exclude */
+    public RemoteSubscriptionManager getRemoteSubscriptionManager()
+    {
+        return remoteSubscriptionManager;
+    }
+
+    /** @exclude */
+    public ThrottleManager getThrottleManager()
+    {
+        return throttleManager;
+    }
+
+    /** @exclude **/
+    @Override
+    public boolean equals(Object o)
+    {
+        if (o instanceof Destination)
+        {
+            Destination d = (Destination)o;
+            String serviceType1 = d.getServiceType();
+            String serviceType2 = getServiceType();
+            if ((serviceType1 == null && serviceType2 == null) || (serviceType1 != null && serviceType1.equals(serviceType2)))
+            {
+                String id1 = d.getId();
+                String id2 = getId();
+                if ((id1 == null && id2 == null) || (id1 != null && id1.equals(id2)))
+                    return true;
+            }
+        }
+        return false;
+    }
+
+    /** @exclude **/
+    @Override
+    public int hashCode()
+    {
+        return (getServiceType() == null ? 0 : getServiceType().hashCode()) * 100003 +
+            (getId() == null ? 0 : getId().hashCode());
+    }
+
+    /** @exclude **/
+    @Override
+    public String toString()
+    {
+        return getServiceType() + "#" + getId();
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Protected/Private Methods
+    //
+    //--------------------------------------------------------------------------
+
+    protected ThrottleManager createThrottleManager()
+    {
+        Service service = getService();
+        if (service == null || service.getMessageBroker() == null)
+            return new ThrottleManager(); // Return the default.
+
+        try
+        {
+            Class<? extends ThrottleManager> throttleManagerClass = service.getMessageBroker().getThrottleManagerClass();
+            Object instance = ClassUtil.createDefaultInstance(throttleManagerClass, null);
+            if (instance instanceof ThrottleManager)
+                return (ThrottleManager)instance;
+        }
+        catch (Throwable t)
+        {
+            // NOWARN
+        }
+
+        return new ThrottleManager(); // Return the default.
+    }
+
+    protected void network(ConfigMap properties)
+    {
+        ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null);
+        if (network == null)
+            return;
+
+        // Get implementation specific network settings, including subclasses!
+        NetworkSettings ns = getNetworkSettings();
+
+        // Subscriber timeout; first check for subscription-timeout-minutes and fallback to legacy session-timeout.
+        int useLegacyPropertyToken = -999999;
+        int subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SUBSCRIPTION_TIMEOUT_MINUTES, useLegacyPropertyToken);
+        if (subscriptionTimeoutMinutes == useLegacyPropertyToken)
+            subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SESSION_TIMEOUT, NetworkSettings.DEFAULT_TIMEOUT);
+        ns.setSubscriptionTimeoutMinutes(subscriptionTimeoutMinutes);
+
+        // Throttle Settings
+        ThrottleSettings ts = ns.getThrottleSettings();
+        ts.setDestinationName(getId());
+        throttle(ts, network);
+
+        setNetworkSettings(ns);
+    }
+
+    protected void throttle(ThrottleSettings ts, ConfigMap network)
+    {
+        ConfigMap inbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_INBOUND, null);
+        if (inbound != null)
+        {
+            ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(inbound);
+            ts.setInboundPolicy(policy);
+            int destFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0);
+            ts.setIncomingDestinationFrequency(destFreq);
+            int clientFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0);
+            ts.setIncomingClientFrequency(clientFreq);
+        }
+
+        ConfigMap outbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_OUTBOUND, null);
+        if (outbound != null)
+        {
+            ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(outbound);
+            ts.setOutboundPolicy(policy);
+            int destFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0);
+            ts.setOutgoingDestinationFrequency(destFreq);
+            int clientFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0);
+            ts.setOutgoingClientFrequency(clientFreq);
+        }
+    }
+
+    private ThrottleSettings.Policy getPolicyFromThrottleSettings(ConfigMap settings)
+    {
+        String policyString = settings.getPropertyAsString(ThrottleSettings.ELEMENT_POLICY, null);
+        ThrottleSettings.Policy policy = ThrottleSettings.Policy.NONE;
+        if (policyString == null)
+            return policy;
+        try
+        {
+            policy = ThrottleSettings.parsePolicy(policyString);
+        }
+        catch (ConfigurationException exception)
+        {
+            ConfigurationException ce = new ConfigurationException();
+            ce.setMessage(UNSUPPORTED_POLICY, new Object[] {getId(), policyString});
+            throw ce;
+        }
+        return policy;
+    }
+
+    protected void server(ConfigMap properties)
+    {
+        ConfigMap server = properties.getPropertyAsMap(DestinationSettings.SERVER_ELEMENT, null);
+        if (server == null)
+            return;
+
+        long ttl = server.getPropertyAsLong(MessagingConstants.TIME_TO_LIVE_ELEMENT, -1);
+        serverSettings.setMessageTTL(ttl);
+
+        boolean durable = server.getPropertyAsBoolean(MessagingConstants.IS_DURABLE_ELEMENT, false);
+        serverSettings.setDurable(durable);
+
+        boolean allowSubtopics = server.getPropertyAsBoolean(MessagingConstants.ALLOW_SUBTOPICS_ELEMENT, false);
+        serverSettings.setAllowSubtopics(allowSubtopics);
+
+        boolean disallowWildcardSubtopics = server.getPropertyAsBoolean(MessagingConstants.DISALLOW_WILDCARD_SUBTOPICS_ELEMENT, false);
+        serverSettings.setDisallowWildcardSubtopics(disallowWildcardSubtopics);
+
+        int priority = server.getPropertyAsInt(MessagingConstants.MESSAGE_PRIORITY, -1);
+        if (priority != -1)
+            serverSettings.setPriority(priority);
+
+        String subtopicSeparator = server.getPropertyAsString(MessagingConstants.SUBTOPIC_SEPARATOR_ELEMENT, MessagingConstants.DEFAULT_SUBTOPIC_SEPARATOR);
+        serverSettings.setSubtopicSeparator(subtopicSeparator);
+
+        String routingMode = server.getPropertyAsString(MessagingConstants.CLUSTER_MESSAGE_ROUTING, "server-to-server");
+        serverSettings.setBroadcastRoutingMode(routingMode);
+    }
+
+    /**
+     * Returns the log category of the <code>MessageDestination</code>.
+     *
+     * @return The log category of the component.
+     */
+    @Override
+    protected String getLogCategory()
+    {
+        return LOG_CATEGORY;
+    }
+
+    /**
+     * Invoked automatically to allow the <code>MessageDestination</code> to setup its corresponding
+     * MBean control.
+     *
+     * @param service The <code>Service</code> that manages this <code>MessageDestination</code>.
+     */
+    @Override
+    protected void setupDestinationControl(Service service)
+    {
+        controller = new MessageDestinationControl(this, service.getControl());
+        controller.register();
+        setControl(controller);
+        setupThrottleManagerControl(controller);
+        setupSubscriptionManagerControl(controller);
+    }
+
+    protected void setupThrottleManagerControl(MessageDestinationControl destinationControl)
+    {
+        if (throttleManager != null)
+        {
+            ThrottleManagerControl throttleManagerControl = new ThrottleManagerControl(throttleManager, destinationControl);
+            throttleManagerControl.register();
+            throttleManager.setControl(throttleManagerControl);
+            throttleManager.setManaged(true);
+            destinationControl.setThrottleManager(throttleManagerControl.getObjectName());
+        }
+    }
+
+    private void setupSubscriptionManagerControl(MessageDestinationControl destinationControl)
+    {
+        SubscriptionManagerControl subscriptionManagerControl = new SubscriptionManagerControl(getSubscriptionManager(), destinationControl);
+        subscriptionManagerControl.register();
+        getSubscriptionManager().setControl(subscriptionManagerControl);
+        getSubscriptionManager().setManaged(true);
+        destinationControl.setSubscriptionManager(subscriptionManagerControl.getObjectName());
+    }
+}