You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by ah...@apache.org on 2014/04/25 07:34:01 UTC

[03/51] [partial] BlazeDS Donation from Adobe Systems Inc

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/FlexClientAttributeListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/FlexClientAttributeListener.java b/modules/core/src/flex/messaging/client/FlexClientAttributeListener.java
new file mode 100755
index 0000000..9481bc2
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/FlexClientAttributeListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Interface for FlexClient attribute listeners.
+ */
+public interface FlexClientAttributeListener 
+{
+    /**
+     * Callback invoked after an attribute is added to the FlexClient.
+     * 
+     * @param event The event containing the associated FlexClient and attribute
+     *              information.
+     */
+    void attributeAdded(FlexClientBindingEvent event);
+    
+    /**
+     * Callback invoked after an attribute is removed from the FlexClient.
+     * 
+     * @param event The event containing the associated FlexClient and attribute
+     *              information.
+     */
+    void attributeReplaced(FlexClientBindingEvent event);
+    
+    /**
+     * Callback invoked after an attribute has been replaced with a new value.
+     * 
+     * @param event The event containing the associated FlexClient and attribute
+     *              information.
+     */
+    void attributeRemoved(FlexClientBindingEvent event);
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/FlexClientBindingEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/FlexClientBindingEvent.java b/modules/core/src/flex/messaging/client/FlexClientBindingEvent.java
new file mode 100755
index 0000000..236f70a
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/FlexClientBindingEvent.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Event used to notify FlexClientAttributeListeners of changes to FlexClient
+ * attributes.
+ */
+public class FlexClientBindingEvent 
+{
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs an event for an attribute that is bound or unbound from a FlexClient.
+     * 
+     * @param client The FlexClient.
+     * @param name The attribute name.
+     */
+    public FlexClientBindingEvent(FlexClient client, String name)
+    {
+        this.client = client;
+        this.name = name;
+    }    
+
+    
+    /**
+     * Constructs an event for an attribute that is added to a FlexClient or 
+     * replaced by a new value.
+     * 
+     * @param client The FlexClient.
+     * @param name The attribute name.
+     * @param value The attribute value.
+     */
+    public FlexClientBindingEvent(FlexClient client, String name, Object value)
+    {
+        this.client = client;
+        this.name = name;
+        this.value = value;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * The FlexClient that generated the event.
+     */
+    private FlexClient client;
+    
+    /**
+     * The name of the attribute associated with the event.
+     */
+    private String name;
+    
+    /**
+     * The value of the attribute associated with the event.
+     */
+    private Object value;
+    
+    //--------------------------------------------------------------------------
+    //
+    // Methods
+    //
+    //--------------------------------------------------------------------------
+    
+    /**
+     * Returns the FlexClient that generated the event.
+     * 
+     * @return The FlexClient that generated the event.
+     */
+    public FlexClient getClient()
+    {
+        return client;
+    }
+    
+    /**
+     * Returns the name of the attribute associated with the event.
+     * 
+     * @return The name of the attribute associated with the event.
+     */
+    public String getName()
+    {
+        return name;
+    }
+    
+    /**
+     * Returns the value of the attribute associated with the event.
+     * 
+     * @return The value of the attribute associated with the event.
+     */
+    public Object getValue()
+    {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/FlexClientBindingListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/FlexClientBindingListener.java b/modules/core/src/flex/messaging/client/FlexClientBindingListener.java
new file mode 100755
index 0000000..d1fb36b
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/FlexClientBindingListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Interface to be notified when the implementing object is bound or unbound from the FlexClient.
+ */
+public interface FlexClientBindingListener 
+{
+    /**
+     * Callback invoked when the object is bound to a FlexClient.
+     * 
+     * @param event The event containing the FlexClient and attribute
+     *              information.
+     */
+    void valueBound(FlexClientBindingEvent event);
+    
+    /**
+     * Callback invoked when the object is unbound from a FlexClient.
+     * 
+     * @param event The event containing the FlexClient and attribute
+     *              information.
+     */
+    void valueUnbound(FlexClientBindingEvent event);
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/FlexClientListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/FlexClientListener.java b/modules/core/src/flex/messaging/client/FlexClientListener.java
new file mode 100755
index 0000000..49dc24a
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/FlexClientListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Interface to be notified when a FlexClient is created or destroyed. Implementations of this interface
+ * may add themselves as created listeners statically via <code>FlexClient.addClientCreatedListener()</code>.
+ * To listen for FlexClient destruction, the implementation instance must add itself as a listener to
+ * a specific FlexClient instance via the <code>addClientDestroyedListener()</code> method.
+ */
+public interface FlexClientListener 
+{
+    /**
+     * Notification that a FlexClient was created.
+     * 
+     * @param client The FlexClient that was created.
+     */
+    void clientCreated(FlexClient client);
+    
+    /**
+     * Notification that a FlexClient is about to be destroyed.
+     * 
+     * @param client The FlexClient that will be destroyed.
+     */
+    void clientDestroyed(FlexClient client);
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/FlexClientManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/FlexClientManager.java b/modules/core/src/flex/messaging/client/FlexClientManager.java
new file mode 100755
index 0000000..d1e7a1e
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/FlexClientManager.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+
+import flex.management.ManageableComponent;
+import flex.management.runtime.messaging.client.FlexClientManagerControl;
+import flex.messaging.FlexContext;
+import flex.messaging.MessageBroker;
+import flex.messaging.MessageException;
+import flex.messaging.config.FlexClientSettings;
+import flex.messaging.endpoints.AbstractEndpoint;
+import flex.messaging.endpoints.Endpoint;
+import flex.messaging.log.Log;
+import flex.messaging.log.LogCategories;
+import flex.messaging.util.ClassUtil;
+import flex.messaging.util.TimeoutAbstractObject;
+import flex.messaging.util.TimeoutManager;
+
+/**
+ * @exclude
+ * Manages FlexClient instances for a MessageBroker.
+ */
+public class FlexClientManager extends ManageableComponent
+{
+    public static final String TYPE = "FlexClientManager";
+
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * @exclude
+     */
+    public FlexClientManager()
+    {
+        this(MessageBroker.getMessageBroker(null));
+    }
+    /**
+     * Constructs a FlexClientManager for the passed MessageBroker.
+     *
+     * @param broker The MessageBroker that the Flex client manager is associated with.
+     */
+    public FlexClientManager(MessageBroker broker)
+    {
+        this(broker.isManaged(), broker);
+    }
+
+    /**
+     * @exclude
+     */
+    public FlexClientManager(boolean enableManagement, MessageBroker mbroker)
+    {
+        super(enableManagement);
+
+        super.setId(TYPE);
+
+        // Ensure that we have a message broker:
+        broker = (mbroker != null) ? mbroker : MessageBroker.getMessageBroker(null);
+
+        FlexClientSettings flexClientSettings = broker.getFlexClientSettings();
+        if (flexClientSettings != null && flexClientSettings.getTimeoutMinutes() != -1)
+        {
+            // Convert from minutes to millis.
+            setFlexClientTimeoutMillis(flexClientSettings.getTimeoutMinutes()*60*1000);
+        }
+
+        this.setParent(broker);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * The MessageBroker that owns this manager.
+     */
+    private final MessageBroker broker;
+
+    /**
+     * The Mbean controller for this manager.
+     */
+    private FlexClientManagerControl controller;
+
+    /**
+     * Table to store FlexClients by id.
+     */
+    private final Map<String,FlexClient> flexClients = new ConcurrentHashMap<String,FlexClient>();
+
+
+    /**
+     * Manages time outs for FlexClients.
+     * This currently includes timeout of FlexClient instances, timeouts for async
+     * long-poll handling, and scheduling delayed flushes of outbound messages.
+     */
+    private volatile TimeoutManager flexClientTimeoutManager;
+
+    //--------------------------------------------------------------------------
+    //
+    // Properties
+    //
+    //--------------------------------------------------------------------------
+
+    //----------------------------------
+    //  clientIds
+    //----------------------------------
+
+    /**
+     * Returns a string array of the client IDs.
+     *
+     * @return A string array of the client IDs.
+     */
+    public String[] getClientIds()
+    {
+        String[] ids = new String[flexClients.size()];
+        ArrayList<String> idList = new ArrayList<String>(flexClients.keySet());
+
+        for (int i = 0; i < flexClients.size(); i++)
+            ids[i] = idList.get(i);
+
+        return ids;
+    }
+
+    //----------------------------------
+    //  flexClientCount
+    //----------------------------------
+
+    /**
+     * Returns the number of FlexClients in use.
+     *
+     * @return The number of FlexClients in use.
+     */
+    public int getFlexClientCount()
+    {
+        return flexClients.size();
+    }
+
+    //----------------------------------
+    //  flexClientTimeoutMillis
+    //----------------------------------
+
+    private volatile long flexClientTimeoutMillis;
+
+    /**
+     * Returns the idle timeout in milliseconds to apply to new FlexClient instances.
+     *
+     * @return The idle timeout in milliseconds to apply to new FlexClient instances.
+     */
+    public long getFlexClientTimeoutMillis()
+    {
+        return flexClientTimeoutMillis;
+    }
+
+    /**
+     * Sets the idle timeout in milliseconds to apply to new FlexClient instances.
+     *
+     * @param value The idle timeout in milliseconds to apply to new FlexClient instances.
+     */
+    public void setFlexClientTimeoutMillis(long value)
+    {
+        if (value < 1)
+            value = 0;
+
+        synchronized (this)
+        {
+            flexClientTimeoutMillis = value;
+        }
+    }
+
+    //----------------------------------
+    //  messageBroker
+    //----------------------------------
+
+    /**
+     * Returns the MessageBroker instance that owns this FlexClientManager.
+     *
+     * @return The parent MessageBroker instance.
+     */
+    public MessageBroker getMessageBroker()
+    {
+        return broker;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Get FlexClient with the specified id or a new one will be created.
+     * This method will return a valid existing FlexClient for the specific Id, 
+     * or a new FlexClient will created  
+     * @param id The id of the Flex client.
+     * @return FlexClient the FlexClient with the specified id
+     */
+    public FlexClient getFlexClient(String id)
+    {
+        return getFlexClient(id, true);
+    }
+    
+    /**
+     * Get the FlexClient with the specified id.
+     *
+     * @param id The id of the Flex client.
+     * @param createNewIfNotExist if true, a new FlexClient will be created if not exist
+     * @return FlexClient the FlexClient with the specified id
+     */
+    public FlexClient getFlexClient(String id, boolean createNewIfNotExist)
+    {
+        FlexClient flexClient = null;
+        // Try to lookup an existing instance if we receive an id.
+        if (id != null)
+        {
+            flexClient = flexClients.get(id);
+            if (flexClient != null)
+            {
+                if (flexClient.isValid() && !flexClient.invalidating)
+                {
+                    flexClient.updateLastUse();
+                    return flexClient;
+                }
+                // Invalid, remove it - it will be replaced below.
+                flexClients.remove(id);
+            }
+        }
+        // Use a manager-level lock (this) when creating/recreating a new FlexClient.
+        synchronized (this)
+        {
+            if (id != null)
+            {
+                flexClient = flexClients.get(id);
+                if (flexClient != null)
+                {
+                    flexClient.updateLastUse();
+                    return flexClient;
+                }
+                else
+                {
+                    if (!createNewIfNotExist)
+                    {
+                        return null;
+                    }
+                }
+            }
+            
+            flexClient = createFlexClient(id);
+            checkForNullAndDuplicateId(flexClient.getId());
+            flexClients.put(flexClient.getId(), flexClient);
+            if (flexClientTimeoutMillis > 0)
+                flexClientTimeoutManager.scheduleTimeout(flexClient);
+            flexClient.notifyCreated();
+            return flexClient;
+        }
+    }
+
+    /**
+     * Creates a FlexClientOutboundQueueProcessor instance and hooks it up to the passed
+     * FlexClient.
+     *
+     * @param flexClient The FlexClient to equip with a queue processor.
+     * @param endpointId The Id of the endpoint the queue processor is used for.
+     * @return The FlexClient with a configured queue processor.
+     */
+    public FlexClientOutboundQueueProcessor createOutboundQueueProcessor(FlexClient flexClient, String endpointId)
+    {
+        // First, try to create a custom outbound queue processor, if one exists.
+        FlexClientOutboundQueueProcessor processor = createCustomOutboundQueueProcessor(flexClient, endpointId);
+
+        // If no custom processor, then try to create default queue processor.
+        if (processor == null)
+            processor = createDefaultOutboundQueueProcessor(flexClient, endpointId);
+
+        // If MessageBroker's default queue processor fails, use the default processor.
+        if (processor == null)
+        {
+            processor = new FlexClientOutboundQueueProcessor();
+            processor.setFlexClient(flexClient);
+            processor.setEndpointId(endpointId);
+        }
+
+        return processor;
+    }
+
+    /**
+     * @exclude
+     * Monitors an async poll for a FlexClient for timeout.
+     *
+     * @param asyncPollTimeout The async poll task to monitor for timeout.
+     */
+    public void monitorAsyncPollTimeout(TimeoutAbstractObject asyncPollTimeout)
+    {
+        flexClientTimeoutManager.scheduleTimeout(asyncPollTimeout);
+    }
+
+    /**
+     * @exclude
+     * Monitors a scheduled flush for a FlexClient for timeout.
+     *
+     * @param scheduledFlushTimeout The schedule flush task to monitor for timeout.
+     */
+    public void monitorScheduledFlush(TimeoutAbstractObject scheduledFlushTimeout)
+    {
+        flexClientTimeoutManager.scheduleTimeout(scheduledFlushTimeout);
+    }
+
+    /**
+     * Starts the Flex client manager.
+     *
+     * @see flex.management.ManageableComponent#start()
+     */
+    @Override
+    public void start()
+    {
+        if (isManaged())
+        {
+            controller = new FlexClientManagerControl(getParent().getControl(), this);
+            setControl(controller);
+            controller.register();
+        }
+
+        final String baseId = getId();
+        flexClientTimeoutManager = new TimeoutManager(new ThreadFactory()
+                                                        {
+                                                            int counter = 1;
+                                                            public synchronized Thread newThread(Runnable runnable)
+                                                            {
+                                                                Thread t = new Thread(runnable);
+                                                                t.setName(baseId + "-FlexClientTimeoutThread-" + counter++);
+                                                                return t;
+                                                            }
+                                                        });
+    }
+
+    /**
+     * @see flex.management.ManageableComponent#stop()
+     */
+    public void stop()
+    {
+        if (controller != null)
+        {
+            controller.unregister();
+        }
+
+        if (flexClientTimeoutManager != null)
+            flexClientTimeoutManager.shutdown();
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Protected Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Hook method invoked when a new <tt>FlexClient</tt> instance is created.
+     * 
+     * @param id The id the client provided, which was previously assigned by this server, 
+     *           or another server in a cluster. New clients will pass a <code>null</code>
+     *           value in which case this server must generate a unique id.
+     */
+    protected FlexClient createFlexClient(String id)
+    {
+        return (id == null) ? new FlexClient(this) : new FlexClient(this, id);
+    }
+    
+    /* (non-Javadoc)
+     * @see flex.management.ManageableComponent#getLogCategory()
+     */
+    protected String getLogCategory()
+    {
+        return LogCategories.CLIENT_FLEXCLIENT;
+    }
+
+    /**
+     * @exclude
+     * Removes a FlexClient from being managed by this manager.
+     * This method is invoked by FlexClients when they are invalidated.
+     *
+     * @param flexClient The id of the FlexClient being invalidated.
+     */
+    protected void removeFlexClient(FlexClient flexClient)
+    {
+        if (flexClient != null)
+        {
+            String id = flexClient.getId();
+            synchronized (id)
+            {
+                FlexClient storedClient = flexClients.get(id);
+                // If the stored instance is the same as the invalidating instance based upon identity,
+                // remove it.
+                if (storedClient == flexClient)
+                    flexClients.remove(id);
+            }
+        }
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Private Methods
+    //
+    //--------------------------------------------------------------------------
+
+    private void checkForNullAndDuplicateId(String id)
+    {
+        if (id == null)
+        {
+            // Cannot create ''{0}'' with null id.
+            MessageException me = new MessageException();
+            me.setMessage(10039, new Object[]{"FlexClient"});
+            me.setCode("Server.Processing.NullId");
+            throw me;
+        }
+
+        if (flexClients.containsKey(id))
+        {
+            // Cannot create ''{0}'' with id ''{1}''; another ''{0}'' is already registered with the same id.
+            MessageException me = new MessageException();
+            me.setMessage(10040, new Object[]{"FlexClient", id});
+            me.setCode("Server.Processing.DuplicateId");
+            throw me;
+        }
+    }
+
+    private FlexClientOutboundQueueProcessor createDefaultOutboundQueueProcessor(
+            FlexClient flexClient, String endpointId)
+    {
+        FlexClientSettings flexClientSettings = broker.getFlexClientSettings();
+        if (flexClientSettings == null)
+            return null;
+
+        String queueProcessorClassName = flexClientSettings.getFlexClientOutboundQueueProcessorClassName();
+        if (queueProcessorClassName == null)
+            return null;
+
+        FlexClientOutboundQueueProcessor processor = null;
+        try
+        {
+            Class queueProcessorClass = createClass(queueProcessorClassName);
+            Object instance = ClassUtil.createDefaultInstance(queueProcessorClass, null);
+            processor = (FlexClientOutboundQueueProcessor)instance;
+            processor.setFlexClient(flexClient);
+            processor.setEndpointId(endpointId);
+            processor.initialize(flexClientSettings.getFlexClientOutboundQueueProcessorProperties());
+        }
+        catch (Throwable t)
+        {
+            String message = "Failed to create MessageBroker's outbound queue processor for FlexClient with id '" + flexClient.getId() + "'.";
+            if (Log.isWarn())
+                Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn(message, t);
+
+            MessageException me = new MessageException(message, t);
+            throw me;
+        }
+
+        return processor;
+    }
+
+    private FlexClientOutboundQueueProcessor createCustomOutboundQueueProcessor(
+            FlexClient flexClient, String endpointId)
+    {
+        FlexClientOutboundQueueProcessor processor = null;
+        Endpoint endpoint = broker.getEndpoint(endpointId);
+        if (endpoint instanceof AbstractEndpoint)
+        {
+            Class processorClass = ((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorClass();
+            if (processorClass != null)
+            {
+                try
+                {
+                    Object instance = ClassUtil.createDefaultInstance(processorClass, null);
+                    if (instance instanceof FlexClientOutboundQueueProcessor)
+                    {
+                        processor = (FlexClientOutboundQueueProcessor)instance;
+                        processor.setFlexClient(flexClient);
+                        processor.setEndpointId(endpointId);
+                        processor.initialize(((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorConfig());
+                    }
+                }
+                catch (Throwable t)
+                {
+                    if (Log.isWarn())
+                        Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn("Failed to create custom outbound queue processor for FlexClient with id '" + flexClient.getId() + "'. Using MessageBroker's default queue processor.", t);
+                }
+            }
+        }
+        return processor;
+    }
+
+    private Class createClass(String className)
+    {
+        Class c = ClassUtil.createClass(className, FlexContext.getMessageBroker() == null ? null :
+                    FlexContext.getMessageBroker().getClassLoader());
+
+        return c;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/FlexClientNotSubscribedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/FlexClientNotSubscribedException.java b/modules/core/src/flex/messaging/client/FlexClientNotSubscribedException.java
new file mode 100755
index 0000000..629405d
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/FlexClientNotSubscribedException.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import flex.messaging.MessageException;
+import flex.messaging.log.LogEvent;
+
+/**
+ * @exclude
+ */
+public class FlexClientNotSubscribedException extends MessageException
+{
+    /**
+     * @exclude
+     */
+    private static final long serialVersionUID = 773524927178340950L;
+
+    //--------------------------------------------------------------------------
+    //
+    // Properties
+    //
+    //--------------------------------------------------------------------------        
+    
+    //----------------------------------
+    //  defaultLogMessageIntro
+    //----------------------------------            
+
+    /**
+     * Overrides the intro text for the log message. 
+     */
+    public String getDefaultLogMessageIntro()
+    {
+        return "FlexClient not subscribed: ";        
+    }
+    
+    //----------------------------------
+    //  logStackTraceEnabled
+    //----------------------------------            
+    
+    /**
+     * Override to disable stack trace logging.
+     */
+    public boolean isLogStackTraceEnabled()
+    {
+        return false;        
+    }    
+    
+    //----------------------------------
+    //  peferredLogLevel
+    //----------------------------------            
+    
+    /**
+     * Override to lower the preferred log level to debug. 
+     */
+    public short getPreferredLogLevel()
+    {
+        return LogEvent.DEBUG;        
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java b/modules/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java
new file mode 100755
index 0000000..059c3ef
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import flex.messaging.Destination;
+import flex.messaging.MessageClient;
+import flex.messaging.MessageDestination;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.messages.Message;
+import flex.messaging.services.messaging.ThrottleManager;
+import flex.messaging.services.messaging.ThrottleManager.ThrottleResult;
+import flex.messaging.services.messaging.ThrottleManager.ThrottleResult.Result;
+
+/**
+ * The base FlexClientOutboundQueueProcessor implementation used if a custom implementation is not
+ * specified. Its behavior is very simple. It adds all new messages in order to the tail
+ * of the outbound queue and flushes all queued messages to the network as quickly as possible.
+ * It also handles the outbound client-level throttling specified at the destination level.
+ *
+ * @author shodgson
+ */
+public class FlexClientOutboundQueueProcessor
+{
+    //--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * The associated FlexClient.
+     */
+    private FlexClient client;
+
+    /**
+     * The last MessageClient messages were flushed to. This is mainly for faster
+     * lookup.
+     */
+    private MessageClient lastMessageClient;
+
+    /**
+     * The associated endpoint's Id.
+     */
+    private String endpointId;
+
+    /**
+     * Manages throttling of outbound client level messages.
+     */
+    protected OutboundQueueThrottleManager outboundQueueThrottleManager;
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * @exclude
+     * Stores the Id for the outbound queue's endpoint.
+     *
+     * @param value The Id for the outbound queue's endpoint.
+     */
+    public void setEndpointId(String value)
+    {
+        endpointId = value;
+    }
+
+    /**
+     * Returns the Id for the outbound queue's endpoint.
+     *
+     * @return The Id for the outbound queue's endpoint.
+     */
+    public String getEndpointId()
+    {
+        return endpointId;
+    }
+
+    /**
+     * @exclude
+     * Sets the associated FlexClient.
+     *
+     * @param value The associated FlexClient.
+     */
+    public void setFlexClient(FlexClient value)
+    {
+        client = value;
+    }
+
+    /**
+     * Returns the associated FlexClient.
+     *
+     * @return The associated FlexClient.
+     */
+    public FlexClient getFlexClient()
+    {
+        return client;
+    }
+
+    /**
+     * Returns the outbound queue throttle manager, or null if one does not exist.
+     *
+     * @return The outbound queue throttle manager.
+     */
+    public OutboundQueueThrottleManager getOutboundQueueThrottleManager()
+    {
+       return outboundQueueThrottleManager;
+    }
+
+    /**
+     * Utility method to initialize (if necessary) and return an outbound queue
+     * throttle manager.
+     *
+     * @return The outbound queue throttle manager.
+     */
+    public OutboundQueueThrottleManager getOrCreateOutboundQueueThrottleManager()
+    {
+        if (outboundQueueThrottleManager == null)
+            outboundQueueThrottleManager = new OutboundQueueThrottleManager(this);
+        return outboundQueueThrottleManager;
+    }
+
+    /**
+     * No-op; this default implementation doesn't require custom initialization.
+     * Subclasses may override to process any custom initialization properties that have been
+     * defined in the server configuration.
+     *
+     * @param properties A ConfigMap containing any custom initialization properties.
+     */
+    public void initialize(ConfigMap properties) {}
+
+    /**
+     * Always adds a new message to the tail of the queue.
+     *
+     * @param outboundQueue The queue of outbound messages.
+     * @param message The new message to add to the queue.
+     */
+    public void add(List<Message> outboundQueue, Message message)
+    {
+        outboundQueue.add(message);
+    }
+
+    /**
+     * Always empties the queue and returns all messages to be sent to the client.
+     *
+     * @param outboundQueue The queue of outbound messages.
+     * @return A FlushResult containing the messages that have been removed from the outbound queue
+     *         to be written to the network and a wait time for the next flush of the outbound queue
+     *         that is the default for the underlying Channel/Endpoint.
+     */
+    public FlushResult flush(List<Message> outboundQueue)
+    {
+        return flush(null /* no client distinction */, outboundQueue);
+    }
+
+    /**
+     * Removes all messages in the queue targeted to this specific MessageClient subscription(s) and
+     * returns them to be sent to the client.
+     * Overrides should be careful to only return messages for the specified MessageClient.
+     *
+     * @param messageClient The specific MessageClient to return messages for.
+     * @param outboundQueue The queue of outbound messages.
+     * @return A FlushResult containing the messages that have been removed from the outbound queue
+     *         to be written to the network for this MessageClient.
+     */
+    public FlushResult flush(MessageClient messageClient, List<Message> outboundQueue)
+    {
+        FlushResult flushResult = new FlushResult();
+        List<Message> messagesToFlush = null;
+
+        for (Iterator<Message> iter = outboundQueue.iterator(); iter.hasNext();)
+        {
+            Message message = iter.next();
+            if (messageClient == null || (message.getClientId().equals(messageClient.getClientId())))
+            {
+                if (isMessageExpired(message)) // Don't flush expired messages.
+                {
+                    iter.remove();
+                    continue;
+                }
+
+                messageClient = messageClient == null? getMessageClient(message) : messageClient;
+
+                // First, apply the destination level outbound throttling.
+                ThrottleResult throttleResult = throttleOutgoingDestinationLevel(messageClient, message, false);
+                Result result = throttleResult.getResult();
+
+                // No destination level throttling; check destination-client level throttling.
+                if (Result.OK == result)
+                {
+                    throttleResult = throttleOutgoingClientLevel(messageClient, message, false);
+                    result = throttleResult.getResult();
+                    // If no throttling, simply add the message to the list.
+                    if (Result.OK == result)
+                    {
+                        updateMessageFrequencyOutgoing(messageClient, message);
+                        if (messagesToFlush == null)
+                            messagesToFlush = new ArrayList<Message>();
+                        messagesToFlush.add(message);
+                    }
+                    // In rest of the policies (which is NONE), simply don't
+                    // add the message to the list.
+                }
+                iter.remove();
+            }
+        }
+
+        flushResult.setMessages(messagesToFlush);
+        return flushResult;
+    }
+
+    /**
+     * Utility method to test whether a message has expired or not.
+     * Messages with a timeToLive value that is shorter than the timespan from the message's
+     * timestamp up to the current system time will cause this method to return true.
+     * If there are expired messages in the outbound queue, flush implementations
+     * should use this helper method to only process and return messages that have
+     * not yet expired.
+     *
+     * @param message The message to test for expiration.
+     *
+     * @return true if the message has a timeToLive value that has expired; otherwise false.
+     */
+    public boolean isMessageExpired(Message message)
+    {
+        return (message.getTimeToLive() > 0 && (System.currentTimeMillis() - message.getTimestamp()) >= message.getTimeToLive());
+    }
+
+    /**
+     * Attempts to throttle the outgoing message at the destination level.
+     *
+     * @param msgClient The client the message is intended for.
+     * @param message The message to consider to throttle.
+     * @param buffered Whether the message has already been buffered. In that case,
+     * parts of regular throttling code is skipped.
+     * @return The result of throttling attempt.
+     */
+    protected ThrottleResult throttleOutgoingDestinationLevel(MessageClient msgClient, Message message, boolean buffered)
+    {
+        ThrottleManager throttleManager = getThrottleManager(msgClient);
+        if (throttleManager != null)
+        {
+            // In already buffered messages, don't use ThrottleManager#throttleOutgoingMessage
+            // to avoid regular throttling handling as the message has already been buffered.
+            if (buffered)
+                return throttleManager.throttleDestinationLevel(message, false /*incoming*/);
+
+            // Otherwise, regular throttling.
+            return throttleManager.throttleOutgoingMessage(message);
+        }
+        return new ThrottleResult(); // Otherwise, return OK result.
+    }
+
+    /**
+     * Attempts to throttle the outgoing message at the destination-client level.
+     *
+     * @param msgClient The client the message is intended for.
+     * @param message The message to consider to throttle.
+     * @param buffered Whether the message has already been buffered. In that case,
+     * parts of regular throttling code is skipped.
+     * @return The result of throttling attempt.
+     */
+    protected ThrottleResult throttleOutgoingClientLevel(MessageClient msgClient, Message message, boolean buffered)
+    {
+        if (outboundQueueThrottleManager != null) // Means client level throttling enabled.
+        {
+            ThrottleResult throttleResult = outboundQueueThrottleManager.throttleOutgoingClientLevel(message);
+            if (!buffered)
+            {
+                ThrottleManager throttleManager = getThrottleManager(msgClient);
+                if (throttleManager != null)
+                    throttleManager.handleOutgoingThrottleResult(message, throttleResult, true /*isClientLevel*/);
+            }
+            return throttleResult;
+        }
+        return new ThrottleResult(); // Otherwise, return OK result.
+    }
+
+    /**
+     * Returns the message client that the message is intended to.
+     *
+     * @param message The message.
+     * @return The message client that the message is intended to.
+     */
+    protected MessageClient getMessageClient(Message message)
+    {
+        // First try using the cached message client.
+        if (lastMessageClient != null && message.getClientId().equals(lastMessageClient.getClientId()))
+        {
+            return lastMessageClient;
+        }
+        else // Go ahead with the lookup.
+        {
+            lastMessageClient = client.getMessageClient((String)message.getClientId());
+            return lastMessageClient;
+        } 
+    }
+
+    /**
+     * Returns the throttle manager associated with the destination the message
+     * is intended to.
+     *
+     * @param msgClient The message client; it can be null.
+     * @return The throttle manager.
+     */
+    protected ThrottleManager getThrottleManager(MessageClient msgClient)
+    {
+        Destination destination = msgClient != null? msgClient.getDestination() : null;
+        return (destination != null && destination instanceof MessageDestination)? 
+                ((MessageDestination)destination).getThrottleManager() : null;
+    }
+
+    /**
+     * Updates the outgoing message's message frequency.
+     *
+     * @param msgClient The MessageClient that might have been passed to the flush; it can be null.
+     * @param message The message.
+     */
+    protected void updateMessageFrequencyOutgoing(MessageClient msgClient, Message message)
+    {
+        // Update the destination level message frequency.
+        ThrottleManager throttleManager = getThrottleManager(msgClient);
+        if (throttleManager != null)
+            throttleManager.updateMessageFrequencyDestinationLevel(false /*incoming*/);
+
+        // Update the client level message frequency.
+        if (outboundQueueThrottleManager != null)
+            outboundQueueThrottleManager.updateMessageFrequencyOutgoingClientLevel(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/FlushResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/FlushResult.java b/modules/core/src/flex/messaging/client/FlushResult.java
new file mode 100755
index 0000000..6dd8cad
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/FlushResult.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import java.util.List;
+
+import flex.messaging.messages.Message;
+
+/**
+ * Stores the messages that should be written to the network as a result of a flush
+ * invocation on a FlexClient's outbound queue.
+ */
+public class FlushResult
+{
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs a <tt>FlushResult</tt> instance to return from a
+     * flush invocation on a FlexClient's outbound queue.
+     * This instance stores the list of messages to write over the network to
+     * the client as well as an optional wait time in milliseconds for when the
+     * next flush should be invoked.
+     */
+    public FlushResult() {}
+
+    //--------------------------------------------------------------------------
+    //
+    // Properties
+    //
+    //--------------------------------------------------------------------------
+
+    //----------------------------------
+    //  messages
+    //----------------------------------
+
+    private List<Message> messages;
+
+    /**
+     * Returns the messages to write to the network for this flush invocation.
+     * This list may be null, in which case no messages are written.
+     *
+     * @return The messages to write to the network for this flush invocation.
+     */
+    public List<Message> getMessages()
+    {
+        return messages;
+    }
+
+    /**
+     * Sets the messages to write to the network for this flush invocation.
+     *
+     * @param value The messages to write to the network for this flush invocation.
+     */
+    public void setMessages(List<Message> value)
+    {
+        messages = value;
+    }
+
+    //----------------------------------
+    //  nextFlushWaitTimeMillis
+    //----------------------------------
+
+    private int nextFlushWaitTimeMillis = 0;
+
+    /**
+     * Returns the wait time in milliseconds for when the next flush invocation should occur.
+     * If this value is 0, the default, a delayed flush is not scheduled and the next flush will
+     * depend upon the underlying Channel/Endpoint.
+     * For client-side polling Channels the next flush invocation will happen when the client sends
+     * its next poll request at its regular interval.
+     * For client-side Channels that support direct writes to the client a flush invocation is triggered
+     * when the next message is added to the outbound queue.
+     *
+     * @return The wait time in milliseconds before flush is next invoked. A value of 0, the default,
+     *         indicates that the default flush behavior for the underlying Channel/Endpoint should be
+     *         used.
+     */
+    public int getNextFlushWaitTimeMillis()
+    {
+        return nextFlushWaitTimeMillis;
+    }
+
+    /**
+     * Sets the wait time in milliseconds for when the next flush invocation should occur.
+     * If this value is 0, the default, a delayed flush is not scheduled and the next flush will
+     * depend upon the underlying Channel/Endpoint.
+     * For client-side polling Channels the next flush invocation will happen when the client sends
+     * its next poll request at its regular interval.
+     * For client-side Channels that support direct writes to the client a flush invocation is triggered
+     * when the next message is added to the outbound queue.
+     * Negative value assignments are treated as 0.
+     *
+     * @param value The wait time in milliseconds before flush will be invoked.
+     */
+    public void setNextFlushWaitTimeMillis(int value)
+    {
+        nextFlushWaitTimeMillis = (value < 1) ? 0 : value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/OutboundQueueThrottleManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/OutboundQueueThrottleManager.java b/modules/core/src/flex/messaging/client/OutboundQueueThrottleManager.java
new file mode 100755
index 0000000..e81a249
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/OutboundQueueThrottleManager.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import flex.messaging.MessageClient.SubscriptionInfo;
+import flex.messaging.config.ThrottleSettings.Policy;
+import flex.messaging.log.Log;
+import flex.messaging.messages.Message;
+import flex.messaging.services.messaging.MessageFrequency;
+import flex.messaging.services.messaging.ThrottleManager;
+import flex.messaging.services.messaging.ThrottleManager.ThrottleResult;
+import flex.messaging.util.StringUtils;
+
+
+/**
+ * Used to keep track of and limit outbound message rates of a single FlexClient queue.
+ * An outbound FlexClient queue can contain messages from multiple MessageClients
+ * across multiple destinations. It can also contain messages for multiple
+ * subscriptions (for each subtopic/selector) across the same destination for
+ * the same MessageClient.
+ */
+public class OutboundQueueThrottleManager
+{
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs a default outbound queue throttle manager.
+     *
+     * @param processor The outbound queue processor that is using this throttle manager.
+     */
+    public OutboundQueueThrottleManager(FlexClientOutboundQueueProcessor processor)
+    {
+        destinationFrequencies = new ConcurrentHashMap<String, DestinationFrequency>();
+        this.processor = processor;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Map of destination id and destination message frequencies.
+     */
+    protected final ConcurrentHashMap<String, DestinationFrequency> destinationFrequencies;
+
+    /**
+     * The parent queue processor of the throttle manager.
+     */
+    protected final FlexClientOutboundQueueProcessor processor;
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Registers the destination with the outbound throttle manager.
+     *
+     * @param destinationId The id of the destination.
+     * @param outboundMaxClientFrequency The outbound max-client-frequency specified
+     * at the destination.
+     * @param outboundPolicy The outbound throttle policy specified at the destination.
+     */
+    public void registerDestination(String destinationId, int outboundMaxClientFrequency, Policy outboundPolicy)
+    {
+        DestinationFrequency frequency = destinationFrequencies.get(destinationId);
+        if (frequency == null)
+        {
+            frequency = new DestinationFrequency(outboundMaxClientFrequency, outboundPolicy);
+            destinationFrequencies.putIfAbsent(destinationId, frequency);
+        }
+    }
+
+    /**
+     * Registers the subscription of a client talking to a destination with the
+     * specified subscription info.
+     *
+     * @param destinationId The destination id.
+     * @param si The subscription information.
+     */
+    public void registerSubscription(String destinationId, SubscriptionInfo si)
+    {
+        DestinationFrequency frequency = destinationFrequencies.get(destinationId);
+        frequency.logMaxFrequencyDuringRegistration(frequency.outboundMaxClientFrequency, si);
+    }
+
+    /**
+     * Unregisters the subscription.
+     *
+     * @param destinationId The destination id.
+     * @param si The subscription information.
+     */
+    public void unregisterSubscription(String destinationId, SubscriptionInfo si)
+    {
+        unregisterDestination(destinationId);
+    }
+
+    /**
+     * Unregisters all subscriptions of the client under the specified destination.
+     *
+     * @param destinationId The destination id.
+     */
+    public void unregisterAllSubscriptions(String destinationId)
+    {
+        unregisterDestination(destinationId);
+    }
+
+    /**
+     * Attempts to throttle the outgoing message.
+     *
+     * @param message The message to consider to throttle.
+     * @return True if the message was throttled; otherwise false.
+     */
+    public ThrottleResult throttleOutgoingClientLevel(Message message)
+    {
+        String destinationId = message.getDestination();
+        if (isDestinationRegistered(destinationId))
+        {
+            DestinationFrequency frequency = destinationFrequencies.get(message.getDestination());
+            int maxFrequency = frequency.getMaxFrequency(message); // Limit to check against.
+            MessageFrequency messageFrequency = frequency.getMessageFrequency(message); // Message rate of the client.
+            if (messageFrequency != null)
+            {
+                ThrottleResult result = messageFrequency.checkLimit(maxFrequency, frequency.outboundPolicy);
+                return result;
+            }
+        }
+        return new ThrottleResult(); // Otherwise, return OK result.
+    }
+
+    /**
+     * Updates the outgoing client level message frequency of the message.
+     *
+     * @param message The message.
+     */
+    public void updateMessageFrequencyOutgoingClientLevel(Message message)
+    {
+        String destinationId = message.getDestination();
+        if (isDestinationRegistered(destinationId))
+        {
+            DestinationFrequency frequency = destinationFrequencies.get(message.getDestination());
+            MessageFrequency messageFrequency = frequency.getMessageFrequency(message);
+            if (messageFrequency != null)
+                messageFrequency.updateMessageFrequency();
+        }
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Protected Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Determines whether the destination has been registered or not.
+     *
+     * @param destinationId The destination id.
+     * @return True if the destination with the specified id has been registered.
+     */
+    protected boolean isDestinationRegistered(String destinationId)
+    {
+        return destinationFrequencies.containsKey(destinationId);
+    }
+
+    /**
+     * Unregisters the destination from the outbound throttle manager.
+     *
+     * @param destinationId The id of the destination.
+     */
+    protected void unregisterDestination(String destinationId)
+    {
+        if (isDestinationRegistered(destinationId))
+            destinationFrequencies.remove(destinationId);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Inner Classes
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Used to keep track of max-client-frequency and outgoing throttle policy
+     * specified at the destination. It also keeps track of outbound message
+     * rates of all MessageClient subscriptions across the destination.
+     */
+    class DestinationFrequency
+    {
+        protected final int outboundMaxClientFrequency; // destination specified client limit.
+        protected final MessageFrequency outboundClientFrequency;
+        protected final Policy outboundPolicy; // destination specified policy.
+
+        /**
+         * Default constructor.
+         *
+         * @param outboundMaxClientFrequency The outbound throttling max-client-frequency of the destination.
+         * @param outboundPolicy The outbound throttling policy of the destination.
+         */
+        DestinationFrequency(int outboundMaxClientFrequency, Policy outboundPolicy)
+        {
+            this.outboundMaxClientFrequency = outboundMaxClientFrequency;
+            this.outboundPolicy = outboundPolicy;
+            outboundClientFrequency = new MessageFrequency(outboundMaxClientFrequency);
+        }
+
+        /**
+         * Returns the max-client-frequency for the subscription the message is
+         * intended for (which is simply the max-client-frequency specified at 
+         * the destination).
+         *
+         * @param message The message.
+         *
+         * @return The max-frequency for the subscription.
+         */
+        int getMaxFrequency(Message message)
+        {
+            return outboundMaxClientFrequency;
+        }
+
+        /**
+         * Given a subscription the message is intended to, returns the message
+         * rate frequency for that subscription.
+         *
+         * @param message The message.
+         * @return The message frequency for the subscription, if it exists; otherwise null.
+         */
+        MessageFrequency getMessageFrequency(Message message)
+        {
+            return outboundClientFrequency;
+        }
+
+        /**
+         * Utility function to log the maxFrequency being used during subscription.
+         *
+         * @param maxFrequency The maxFrequency to log.
+         */
+        void logMaxFrequencyDuringRegistration(int maxFrequency, SubscriptionInfo si)
+        {
+            if (Log.isDebug())
+                Log.getLogger(ThrottleManager.LOG_CATEGORY).debug("Outbound queue throttle manager for FlexClient '"
+                        + processor.getFlexClient().getId() + "' is using '" + maxFrequency
+                        + "' as the throttling limit for its subscription: "
+                        +  StringUtils.NEWLINE + si);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/PollFlushResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/PollFlushResult.java b/modules/core/src/flex/messaging/client/PollFlushResult.java
new file mode 100755
index 0000000..fe4a76d
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/PollFlushResult.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Extends <tt>FlushResult</tt> and adds additional properties for controlling 
+ * client polling behavior.
+ */
+public class PollFlushResult extends FlushResult
+{
+    //--------------------------------------------------------------------------
+    //
+    // Properties
+    //
+    //--------------------------------------------------------------------------
+
+    //----------------------------------
+    //  avoidBusyPolling
+    //----------------------------------
+
+    private boolean avoidBusyPolling;
+    
+    /**
+     * Indicates whether the handling of this result should attempt to avoid
+     * potential busy-polling cycles.
+     * This will be set to <code>true</code> in the case of two clients that are both
+     * long-polling the server over the same session.
+     * 
+     * @return <code>true</code> if the handling of this result should attempt to avoid potential
+     *         busy-polling cycles.
+     */
+    public boolean isAvoidBusyPolling()
+    {
+        return avoidBusyPolling;
+    }
+    
+    /**
+     * Set to <code>true</code> to signal that handling for this result should attempt to avoid
+     * potential busy-polling cycles.
+     * 
+     * @param value <code>true</code> to signal that handling for this result should attempt to 
+     *        avoid potential busy-polling cycles.
+     */
+    public void setAvoidBusyPolling(boolean value)
+    {
+        avoidBusyPolling = value;
+    }
+    
+    //----------------------------------
+    //  clientProcessingSuppressed
+    //----------------------------------
+
+    private boolean clientProcessingSuppressed;
+    
+    /**
+     * Indicates whether client processing of this result should be
+     * suppressed.
+     * This should be <code>true</code> for results generated for poll requests
+     * that arrive while a long-poll request from the same client is being serviced
+     * to avoid a busy polling cycle.
+     * 
+     * @return <code>true</code> if client processing of this result should be suppressed;
+     *         otherwise <code>false</code>.
+     */
+    public boolean isClientProcessingSuppressed()
+    {
+        return clientProcessingSuppressed;
+    }
+    
+    /**
+     * Set to <code>true</code> to suppress client processing of this result.
+     * Default is <code>false</code>.
+     * This should be set to <code>true</code> for results generated for poll requests
+     * that arrive while a long-poll request from the same client is being serviced
+     * to avoid a busy polling cycle.
+     * 
+     * @param value <code>true</code> to suppress client processing of the result.
+     */
+    public void setClientProcessingSuppressed(boolean value)
+    {
+        clientProcessingSuppressed = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/PollWaitListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/PollWaitListener.java b/modules/core/src/flex/messaging/client/PollWaitListener.java
new file mode 100755
index 0000000..ae4b820
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/PollWaitListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Used alongside invocations of <code>FlexClient.pollWithWait()</code> to allow calling code to
+ * maintain a record of the Objects being used to place waited poll requests into a wait
+ * state. This can be used to break the threads out of their wait state separately from the
+ * internal waited poll handling within <code>FlexClient</code>.
+ */
+public interface PollWaitListener
+{
+    /**
+     * Hook method invoked directly before a wait begins.
+     * 
+     * @param notifier The <tt>Object</tt> being used to <code>wait()/notify()</code>.
+     */
+    void waitStart(Object notifier);
+    
+    /**
+     * Hook method invoked directly after a wait completes.
+     * 
+     * @param notifier The <tt>Object</tt> being used to <code>wait()/notify()</code>.
+     */
+    void waitEnd(Object notifier);
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/UserAgentSettings.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/UserAgentSettings.java b/modules/core/src/flex/messaging/client/UserAgentSettings.java
new file mode 100755
index 0000000..e04ef88
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/UserAgentSettings.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * A class to hold user agent specific properties. For example, in streaming
+ * endpoints, a certain number of bytes need to be written before the
+ * streaming connection can be used and this value is specific to user agents.
+ * Similarly, the number of simultaneous connections a session can have is user
+ * agent specific.
+ */
+public class UserAgentSettings
+{
+    /**
+     * The prefixes of the version token used by various browsers.
+     */
+    public static final String USER_AGENT_ANDROID = "Android";
+    public static final String USER_AGENT_CHROME = "Chrome";
+    public static final String USER_AGENT_FIREFOX = "Firefox";
+    public static final String USER_AGENT_FIREFOX_1 = "Firefox/1";
+    public static final String USER_AGENT_FIREFOX_2 = "Firefox/2";
+    public static final String USER_AGENT_MSIE = "MSIE";
+    public static final String USER_AGENT_MSIE_5 = "MSIE 5";
+    public static final String USER_AGENT_MSIE_6 = "MSIE 6";
+    public static final String USER_AGENT_MSIE_7 = "MSIE 7";
+    public static final String USER_AGENT_OPERA = "Opera";
+    public static final String USER_AGENT_OPERA_8 = "Opera 8";
+    // Opera 10,11 ship as User Agent Opera/9.8.
+    public static final String USER_AGENT_OPERA_10 = "Opera/9.8"; 
+    public static final String USER_AGENT_SAFARI = "Safari";
+
+    /**
+     * Bytes needed to kickstart the streaming connections for IE.
+     */
+    public static final int KICKSTART_BYTES_MSIE = 2048;
+    /**
+     * Bytes needed to kickstart the streaming connections for SAFARI.
+     */
+    public static final int KICKSTART_BYTES_SAFARI = 512;
+    /**
+     * Bytes needs to kicksart the streaming connections for Android.
+     */
+    public static final int KICKSTART_BYTES_ANDROID = 4010;
+
+    /**
+     * The default number of persistent connections per session for various browsers.
+     */
+    private static final int MAX_PERSISTENT_CONNECTIONS_LEGACY =  1;
+    public static final int MAX_PERSISTENT_CONNECTIONS_DEFAULT = 5;
+    private static final int MAX_PERSISTENT_CONNECTIONS_OPERA_LEGACY = 3;
+    private static final int MAX_PERSISTENT_CONNECTIONS_CHROME = MAX_PERSISTENT_CONNECTIONS_DEFAULT;
+    private static final int MAX_PERSISTENT_CONNECTIONS_FIREFOX = MAX_PERSISTENT_CONNECTIONS_DEFAULT;
+    private static final int MAX_PERSISTENT_CONNECTIONS_MSIE = MAX_PERSISTENT_CONNECTIONS_DEFAULT;
+    private static final int MAX_PERSISTENT_CONNECTIONS_OPERA = 7;
+    private static final int MAX_PERSISTENT_CONNECTIONS_SAFARI = 3;
+
+    private String matchOn;
+    private int kickstartBytes;
+    private int maxPersistentConnectionsPerSession = MAX_PERSISTENT_CONNECTIONS_DEFAULT;
+
+    /**
+     *  Static method to retrieve pre-initialized user agents which are as follows:
+     *
+     *  In Chrome 0, 1, 2, the limit is 6:
+     *      match-on="Chrome" max-persistent-connections-per-session="5"
+     *
+     *  In Firefox 1, 2, the limit is 2:
+     *      match-on="Firefox" max-persistent-connections-per-session="1"
+     *
+     *  In Firefox 3, the limit is 6:
+     *      match-on="Firefox/3" max-persistent-connections-per-session="5"
+     *
+     *  In MSIE 5, 6, 7, the limit is 2 with kickstart bytes of 2K:
+     *      match-on="MSIE" max-persistent-connections-per-session="1" kickstart-bytes="2048"
+     *
+     *  In MSIE 8, the limit is 6 with kickstart bytes of 2K:
+     *      match-on="MSIE 8" max-persistent-connections-per-session="5" kickstart-bytes="2048"
+     *
+     *  In Opera 7, 9, the limit is 4:
+     *      match-on="Opera" max-persistent-connections-per-session="3"
+     *
+     *  In Opera 8, the limit is 8:
+     *      match-on="Opera 8" max-persistent-connections-per-session="7"
+     *
+     *  In Opera 10, the limit is 8.
+     *      match-on="Opera 10" max-persistent-connections-per-session="7"
+     *
+     *  In Safari 3, 4, the limit is 4.
+     *      match-on="Safari" max-persistent-connections-per-session="3"
+     *
+     * @param matchOn String to use match the agent.
+     */
+    public static UserAgentSettings getAgent(String matchOn)
+    {
+        UserAgentSettings userAgent = new UserAgentSettings();
+        userAgent.setMatchOn(matchOn);
+
+        if (USER_AGENT_ANDROID.equals(matchOn))
+        {
+            userAgent.setKickstartBytes(KICKSTART_BYTES_ANDROID);
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_SAFARI);
+        }
+        if (USER_AGENT_CHROME.equals(matchOn))
+        {
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_CHROME);
+        }
+        else if (USER_AGENT_FIREFOX.equals(matchOn))
+        {
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_FIREFOX);
+        }
+        else if (USER_AGENT_FIREFOX_1.equals(matchOn))
+        {
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+        }
+        else if (USER_AGENT_FIREFOX_2.equals(matchOn))
+        {
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+        }
+        else if (USER_AGENT_MSIE.equals(matchOn))
+        {
+            userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE);
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_MSIE);
+        }
+        else if (USER_AGENT_MSIE_5.equals(matchOn))
+        {
+            userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE);
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+        }
+        else if (USER_AGENT_MSIE_6.equals(matchOn))
+        {
+            userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE);
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+        }
+        else if (USER_AGENT_MSIE_7.equals(matchOn))
+        {
+            userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE);
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+        }
+        else if (USER_AGENT_OPERA.equals(matchOn))
+        {
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA_LEGACY);
+        }
+        else if (USER_AGENT_OPERA_8.equals(matchOn))
+        {
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA);
+        }
+        else if (USER_AGENT_OPERA_10.equals(matchOn))
+        {
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA);
+        }
+        else if (USER_AGENT_SAFARI.equals(matchOn))
+        {
+            userAgent.setKickstartBytes(KICKSTART_BYTES_SAFARI);
+            userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_SAFARI);
+        }
+        return userAgent;
+    }
+
+    /**
+     * Returns the String to use to match the agent.
+     *
+     * @return The String to use to match the agent.
+     */
+    public String getMatchOn()
+    {
+        return matchOn;
+    }
+
+    /**
+     * Sets the String to use to match the agent.
+     *
+     * @param matchOn The String to use to match the agent.
+     */
+    public void setMatchOn(String matchOn)
+    {
+        this.matchOn = matchOn;
+    }
+
+    /**
+     * Returns the number of bytes needed to kickstart the streaming connections
+     * for the user agent.
+     *
+     * @return The number of bytes needed to kickstart the streaming connections
+     * for the user agent.
+     */
+    public int getKickstartBytes()
+    {
+        return kickstartBytes;
+    }
+
+    /**
+     * Sets the number of bytes needed to kickstart the streaming connections
+     * for the user agent.
+     *
+     * @param kickstartBytes The number of bytes needed to kickstart the streaming
+     * connections for the user agent.
+     */
+    public void setKickstartBytes(int kickstartBytes)
+    {
+        if (kickstartBytes < 0)
+            kickstartBytes = 0;
+        this.kickstartBytes = kickstartBytes;
+    }
+
+    /**
+     * @deprecated Use {@link UserAgentSettings#getMaxPersistentConnectionsPerSession()} instead.
+     */
+    public int getMaxStreamingConnectionsPerSession()
+    {
+        return getMaxPersistentConnectionsPerSession();
+    }
+
+    /**
+     * @deprecated Use {@link UserAgentSettings#setMaxPersistentConnectionsPerSession(int)} instead.
+     */
+    public void setMaxStreamingConnectionsPerSession(int maxStreamingConnectionsPerSession)
+    {
+        setMaxPersistentConnectionsPerSession(maxStreamingConnectionsPerSession);
+    }
+
+    /**
+     * Returns the number of simultaneous streaming connections per session
+     * the user agent supports.
+     *
+     * @return The number of streaming connections per session the user agent supports.
+     */
+    public int getMaxPersistentConnectionsPerSession()
+    {
+        return maxPersistentConnectionsPerSession;
+    }
+
+    /**
+     * Sets the number of simultaneous streaming connections per session
+     * the user agent supports.
+     *
+     * @param maxStreamingConnectionsPerSession The number of simultaneous
+     * streaming connections per session the user agent supports.
+     */
+    public void setMaxPersistentConnectionsPerSession(int maxStreamingConnectionsPerSession)
+    {
+        this.maxPersistentConnectionsPerSession = maxStreamingConnectionsPerSession;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/client/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/client/package-info.java b/modules/core/src/flex/messaging/client/package-info.java
new file mode 100755
index 0000000..e9a5b98
--- /dev/null
+++ b/modules/core/src/flex/messaging/client/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flex.messaging.client;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/cluster/BroadcastHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/cluster/BroadcastHandler.java b/modules/core/src/flex/messaging/cluster/BroadcastHandler.java
new file mode 100755
index 0000000..7d6f365
--- /dev/null
+++ b/modules/core/src/flex/messaging/cluster/BroadcastHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.cluster;
+
+import java.util.List;
+
+/**
+ * @exclude
+ * This interface represents a handler for a message broadcast by a Cluster.
+ * Clusters broadcast messages across their physical nodes, and when they
+ * receive those messages they locate a BroadcastHandler capable of handling
+ * the broadcast.
+ */
+public interface BroadcastHandler
+{
+    /**
+     * Handle the broadcast message.
+     *
+     * @param sender sender of the original message
+     * @param params any parameters need to handle the message
+     */
+    void handleBroadcast(Object sender, List<Object> params);
+
+    /**
+     * Determine whether this Handler supports a particular operation by name.
+     *
+     * @return whether or not this handler supports the named operation
+     * @param name name of the operation
+     */
+    boolean isSupportedOperation(String name);
+}