You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by cd...@apache.org on 2015/12/20 14:13:47 UTC
[07/51] [partial] flex-blazeds git commit: Removed legacy directories
and made the content of the modules directory the new root - Please use the
maven build for now as the Ant build will no longer work untill it is
adjusted to the new directory structur
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientBindingEvent.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/FlexClientBindingEvent.java b/core/src/flex/messaging/client/FlexClientBindingEvent.java
new file mode 100644
index 0000000..3fa20de
--- /dev/null
+++ b/core/src/flex/messaging/client/FlexClientBindingEvent.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Event used to notify FlexClientAttributeListeners of changes to FlexClient
+ * attributes.
+ */
+public class FlexClientBindingEvent
+{
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Constructs an event for an attribute that is bound or unbound from a FlexClient.
+ *
+ * @param client The FlexClient.
+ * @param name The attribute name.
+ */
+ public FlexClientBindingEvent(FlexClient client, String name)
+ {
+ this.client = client;
+ this.name = name;
+ }
+
+
+ /**
+ * Constructs an event for an attribute that is added to a FlexClient or
+ * replaced by a new value.
+ *
+ * @param client The FlexClient.
+ * @param name The attribute name.
+ * @param value The attribute value.
+ */
+ public FlexClientBindingEvent(FlexClient client, String name, Object value)
+ {
+ this.client = client;
+ this.name = name;
+ this.value = value;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * The FlexClient that generated the event.
+ */
+ private FlexClient client;
+
+ /**
+ * The name of the attribute associated with the event.
+ */
+ private String name;
+
+ /**
+ * The value of the attribute associated with the event.
+ */
+ private Object value;
+
+ //--------------------------------------------------------------------------
+ //
+ // Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Returns the FlexClient that generated the event.
+ *
+ * @return The FlexClient that generated the event.
+ */
+ public FlexClient getClient()
+ {
+ return client;
+ }
+
+ /**
+ * Returns the name of the attribute associated with the event.
+ *
+ * @return The name of the attribute associated with the event.
+ */
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * Returns the value of the attribute associated with the event.
+ *
+ * @return The value of the attribute associated with the event.
+ */
+ public Object getValue()
+ {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientBindingListener.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/FlexClientBindingListener.java b/core/src/flex/messaging/client/FlexClientBindingListener.java
new file mode 100644
index 0000000..50c319f
--- /dev/null
+++ b/core/src/flex/messaging/client/FlexClientBindingListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Interface to be notified when the implementing object is bound or unbound from the FlexClient.
+ */
+public interface FlexClientBindingListener
+{
+ /**
+ * Callback invoked when the object is bound to a FlexClient.
+ *
+ * @param event The event containing the FlexClient and attribute
+ * information.
+ */
+ void valueBound(FlexClientBindingEvent event);
+
+ /**
+ * Callback invoked when the object is unbound from a FlexClient.
+ *
+ * @param event The event containing the FlexClient and attribute
+ * information.
+ */
+ void valueUnbound(FlexClientBindingEvent event);
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientListener.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/FlexClientListener.java b/core/src/flex/messaging/client/FlexClientListener.java
new file mode 100644
index 0000000..7c8cd3d
--- /dev/null
+++ b/core/src/flex/messaging/client/FlexClientListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Interface to be notified when a FlexClient is created or destroyed. Implementations of this interface
+ * may add themselves as created listeners statically via <code>FlexClient.addClientCreatedListener()</code>.
+ * To listen for FlexClient destruction, the implementation instance must add itself as a listener to
+ * a specific FlexClient instance via the <code>addClientDestroyedListener()</code> method.
+ */
+public interface FlexClientListener
+{
+ /**
+ * Notification that a FlexClient was created.
+ *
+ * @param client The FlexClient that was created.
+ */
+ void clientCreated(FlexClient client);
+
+ /**
+ * Notification that a FlexClient is about to be destroyed.
+ *
+ * @param client The FlexClient that will be destroyed.
+ */
+ void clientDestroyed(FlexClient client);
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientManager.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/FlexClientManager.java b/core/src/flex/messaging/client/FlexClientManager.java
new file mode 100644
index 0000000..943956d
--- /dev/null
+++ b/core/src/flex/messaging/client/FlexClientManager.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+
+import flex.management.ManageableComponent;
+import flex.management.runtime.messaging.client.FlexClientManagerControl;
+import flex.messaging.FlexContext;
+import flex.messaging.MessageBroker;
+import flex.messaging.MessageException;
+import flex.messaging.config.FlexClientSettings;
+import flex.messaging.endpoints.AbstractEndpoint;
+import flex.messaging.endpoints.Endpoint;
+import flex.messaging.log.Log;
+import flex.messaging.log.LogCategories;
+import flex.messaging.util.ClassUtil;
+import flex.messaging.util.TimeoutAbstractObject;
+import flex.messaging.util.TimeoutManager;
+
+/**
+ *
+ * Manages FlexClient instances for a MessageBroker.
+ */
+public class FlexClientManager extends ManageableComponent
+{
+ public static final String TYPE = "FlexClientManager";
+
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ *
+ */
+ public FlexClientManager()
+ {
+ this(MessageBroker.getMessageBroker(null));
+ }
+ /**
+ * Constructs a FlexClientManager for the passed MessageBroker.
+ *
+ * @param broker The MessageBroker that the Flex client manager is associated with.
+ */
+ public FlexClientManager(MessageBroker broker)
+ {
+ this(broker.isManaged(), broker);
+ }
+
+ /**
+ *
+ */
+ public FlexClientManager(boolean enableManagement, MessageBroker mbroker)
+ {
+ super(enableManagement);
+
+ super.setId(TYPE);
+
+ // Ensure that we have a message broker:
+ broker = (mbroker != null) ? mbroker : MessageBroker.getMessageBroker(null);
+
+ FlexClientSettings flexClientSettings = broker.getFlexClientSettings();
+ if (flexClientSettings != null && flexClientSettings.getTimeoutMinutes() != -1)
+ {
+ // Convert from minutes to millis.
+ setFlexClientTimeoutMillis(flexClientSettings.getTimeoutMinutes()*60*1000);
+ }
+
+ this.setParent(broker);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * The MessageBroker that owns this manager.
+ */
+ private final MessageBroker broker;
+
+ /**
+ * The Mbean controller for this manager.
+ */
+ private FlexClientManagerControl controller;
+
+ /**
+ * Table to store FlexClients by id.
+ */
+ private final Map<String,FlexClient> flexClients = new ConcurrentHashMap<String,FlexClient>();
+
+
+ /**
+ * Manages time outs for FlexClients.
+ * This currently includes timeout of FlexClient instances, timeouts for async
+ * long-poll handling, and scheduling delayed flushes of outbound messages.
+ */
+ private volatile TimeoutManager flexClientTimeoutManager;
+
+ //--------------------------------------------------------------------------
+ //
+ // Properties
+ //
+ //--------------------------------------------------------------------------
+
+ //----------------------------------
+ // clientIds
+ //----------------------------------
+
+ /**
+ * Returns a string array of the client IDs.
+ *
+ * @return A string array of the client IDs.
+ */
+ public String[] getClientIds()
+ {
+ String[] ids = new String[flexClients.size()];
+ ArrayList<String> idList = new ArrayList<String>(flexClients.keySet());
+
+ for (int i = 0; i < flexClients.size(); i++)
+ ids[i] = idList.get(i);
+
+ return ids;
+ }
+
+ //----------------------------------
+ // flexClientCount
+ //----------------------------------
+
+ /**
+ * Returns the number of FlexClients in use.
+ *
+ * @return The number of FlexClients in use.
+ */
+ public int getFlexClientCount()
+ {
+ return flexClients.size();
+ }
+
+ //----------------------------------
+ // flexClientTimeoutMillis
+ //----------------------------------
+
+ private volatile long flexClientTimeoutMillis;
+
+ /**
+ * Returns the idle timeout in milliseconds to apply to new FlexClient instances.
+ *
+ * @return The idle timeout in milliseconds to apply to new FlexClient instances.
+ */
+ public long getFlexClientTimeoutMillis()
+ {
+ return flexClientTimeoutMillis;
+ }
+
+ /**
+ * Sets the idle timeout in milliseconds to apply to new FlexClient instances.
+ *
+ * @param value The idle timeout in milliseconds to apply to new FlexClient instances.
+ */
+ public void setFlexClientTimeoutMillis(long value)
+ {
+ if (value < 1)
+ value = 0;
+
+ synchronized (this)
+ {
+ flexClientTimeoutMillis = value;
+ }
+ }
+
+ //----------------------------------
+ // messageBroker
+ //----------------------------------
+
+ /**
+ * Returns the MessageBroker instance that owns this FlexClientManager.
+ *
+ * @return The parent MessageBroker instance.
+ */
+ public MessageBroker getMessageBroker()
+ {
+ return broker;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Get FlexClient with the specified id or a new one will be created.
+ * This method will return a valid existing FlexClient for the specific Id,
+ * or a new FlexClient will created
+ * @param id The id of the Flex client.
+ * @return FlexClient the FlexClient with the specified id
+ */
+ public FlexClient getFlexClient(String id)
+ {
+ return getFlexClient(id, true);
+ }
+
+ /**
+ * Get the FlexClient with the specified id.
+ *
+ * @param id The id of the Flex client.
+ * @param createNewIfNotExist if true, a new FlexClient will be created if not exist
+ * @return FlexClient the FlexClient with the specified id
+ */
+ public FlexClient getFlexClient(String id, boolean createNewIfNotExist)
+ {
+ FlexClient flexClient = null;
+ // Try to lookup an existing instance if we receive an id.
+ if (id != null)
+ {
+ flexClient = flexClients.get(id);
+ if (flexClient != null)
+ {
+ if (flexClient.isValid() && !flexClient.invalidating)
+ {
+ flexClient.updateLastUse();
+ return flexClient;
+ }
+ // Invalid, remove it - it will be replaced below.
+ flexClients.remove(id);
+ }
+ }
+ // Use a manager-level lock (this) when creating/recreating a new FlexClient.
+ synchronized (this)
+ {
+ if (id != null)
+ {
+ flexClient = flexClients.get(id);
+ if (flexClient != null)
+ {
+ flexClient.updateLastUse();
+ return flexClient;
+ }
+ else
+ {
+ if (!createNewIfNotExist)
+ {
+ return null;
+ }
+ }
+ }
+
+ flexClient = createFlexClient(id);
+ checkForNullAndDuplicateId(flexClient.getId());
+ flexClients.put(flexClient.getId(), flexClient);
+ if (flexClientTimeoutMillis > 0)
+ flexClientTimeoutManager.scheduleTimeout(flexClient);
+ flexClient.notifyCreated();
+ return flexClient;
+ }
+ }
+
+ /**
+ * Creates a FlexClientOutboundQueueProcessor instance and hooks it up to the passed
+ * FlexClient.
+ *
+ * @param flexClient The FlexClient to equip with a queue processor.
+ * @param endpointId The Id of the endpoint the queue processor is used for.
+ * @return The FlexClient with a configured queue processor.
+ */
+ public FlexClientOutboundQueueProcessor createOutboundQueueProcessor(FlexClient flexClient, String endpointId)
+ {
+ // First, try to create a custom outbound queue processor, if one exists.
+ FlexClientOutboundQueueProcessor processor = createCustomOutboundQueueProcessor(flexClient, endpointId);
+
+ // If no custom processor, then try to create default queue processor.
+ if (processor == null)
+ processor = createDefaultOutboundQueueProcessor(flexClient, endpointId);
+
+ // If MessageBroker's default queue processor fails, use the default processor.
+ if (processor == null)
+ {
+ processor = new FlexClientOutboundQueueProcessor();
+ processor.setFlexClient(flexClient);
+ processor.setEndpointId(endpointId);
+ }
+
+ return processor;
+ }
+
+ /**
+ *
+ * Monitors an async poll for a FlexClient for timeout.
+ *
+ * @param asyncPollTimeout The async poll task to monitor for timeout.
+ */
+ public void monitorAsyncPollTimeout(TimeoutAbstractObject asyncPollTimeout)
+ {
+ flexClientTimeoutManager.scheduleTimeout(asyncPollTimeout);
+ }
+
+ /**
+ *
+ * Monitors a scheduled flush for a FlexClient for timeout.
+ *
+ * @param scheduledFlushTimeout The schedule flush task to monitor for timeout.
+ */
+ public void monitorScheduledFlush(TimeoutAbstractObject scheduledFlushTimeout)
+ {
+ flexClientTimeoutManager.scheduleTimeout(scheduledFlushTimeout);
+ }
+
+ /**
+ * Starts the Flex client manager.
+ *
+ * @see flex.management.ManageableComponent#start()
+ */
+ @Override
+ public void start()
+ {
+ if (isManaged())
+ {
+ controller = new FlexClientManagerControl(getParent().getControl(), this);
+ setControl(controller);
+ controller.register();
+ }
+
+ final String baseId = getId();
+ flexClientTimeoutManager = new TimeoutManager(new ThreadFactory()
+ {
+ int counter = 1;
+ public synchronized Thread newThread(Runnable runnable)
+ {
+ Thread t = new Thread(runnable);
+ t.setName(baseId + "-FlexClientTimeoutThread-" + counter++);
+ return t;
+ }
+ });
+ }
+
+ /**
+ * @see flex.management.ManageableComponent#stop()
+ */
+ public void stop()
+ {
+ if (controller != null)
+ {
+ controller.unregister();
+ }
+
+ if (flexClientTimeoutManager != null)
+ flexClientTimeoutManager.shutdown();
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Protected Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Hook method invoked when a new <tt>FlexClient</tt> instance is created.
+ *
+ * @param id The id the client provided, which was previously assigned by this server,
+ * or another server in a cluster. New clients will pass a <code>null</code>
+ * value in which case this server must generate a unique id.
+ */
+ protected FlexClient createFlexClient(String id)
+ {
+ return (id == null) ? new FlexClient(this) : new FlexClient(this, id);
+ }
+
+ /* (non-Javadoc)
+ * @see flex.management.ManageableComponent#getLogCategory()
+ */
+ protected String getLogCategory()
+ {
+ return LogCategories.CLIENT_FLEXCLIENT;
+ }
+
+ /**
+ *
+ * Removes a FlexClient from being managed by this manager.
+ * This method is invoked by FlexClients when they are invalidated.
+ *
+ * @param flexClient The id of the FlexClient being invalidated.
+ */
+ protected void removeFlexClient(FlexClient flexClient)
+ {
+ if (flexClient != null)
+ {
+ String id = flexClient.getId();
+ synchronized (id)
+ {
+ FlexClient storedClient = flexClients.get(id);
+ // If the stored instance is the same as the invalidating instance based upon identity,
+ // remove it.
+ if (storedClient == flexClient)
+ flexClients.remove(id);
+ }
+ }
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Private Methods
+ //
+ //--------------------------------------------------------------------------
+
+ private void checkForNullAndDuplicateId(String id)
+ {
+ if (id == null)
+ {
+ // Cannot create ''{0}'' with null id.
+ MessageException me = new MessageException();
+ me.setMessage(10039, new Object[]{"FlexClient"});
+ me.setCode("Server.Processing.NullId");
+ throw me;
+ }
+
+ if (flexClients.containsKey(id))
+ {
+ // Cannot create ''{0}'' with id ''{1}''; another ''{0}'' is already registered with the same id.
+ MessageException me = new MessageException();
+ me.setMessage(10040, new Object[]{"FlexClient", id});
+ me.setCode("Server.Processing.DuplicateId");
+ throw me;
+ }
+ }
+
+ private FlexClientOutboundQueueProcessor createDefaultOutboundQueueProcessor(
+ FlexClient flexClient, String endpointId)
+ {
+ FlexClientSettings flexClientSettings = broker.getFlexClientSettings();
+ if (flexClientSettings == null)
+ return null;
+
+ String queueProcessorClassName = flexClientSettings.getFlexClientOutboundQueueProcessorClassName();
+ if (queueProcessorClassName == null)
+ return null;
+
+ FlexClientOutboundQueueProcessor processor = null;
+ try
+ {
+ Class queueProcessorClass = createClass(queueProcessorClassName);
+ Object instance = ClassUtil.createDefaultInstance(queueProcessorClass, null);
+ processor = (FlexClientOutboundQueueProcessor)instance;
+ processor.setFlexClient(flexClient);
+ processor.setEndpointId(endpointId);
+ processor.initialize(flexClientSettings.getFlexClientOutboundQueueProcessorProperties());
+ }
+ catch (Throwable t)
+ {
+ String message = "Failed to create MessageBroker's outbound queue processor for FlexClient with id '" + flexClient.getId() + "'.";
+ if (Log.isWarn())
+ Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn(message, t);
+
+ MessageException me = new MessageException(message, t);
+ throw me;
+ }
+
+ return processor;
+ }
+
+ private FlexClientOutboundQueueProcessor createCustomOutboundQueueProcessor(
+ FlexClient flexClient, String endpointId)
+ {
+ FlexClientOutboundQueueProcessor processor = null;
+ Endpoint endpoint = broker.getEndpoint(endpointId);
+ if (endpoint instanceof AbstractEndpoint)
+ {
+ Class processorClass = ((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorClass();
+ if (processorClass != null)
+ {
+ try
+ {
+ Object instance = ClassUtil.createDefaultInstance(processorClass, null);
+ if (instance instanceof FlexClientOutboundQueueProcessor)
+ {
+ processor = (FlexClientOutboundQueueProcessor)instance;
+ processor.setFlexClient(flexClient);
+ processor.setEndpointId(endpointId);
+ processor.initialize(((AbstractEndpoint)endpoint).getFlexClientOutboundQueueProcessorConfig());
+ }
+ }
+ catch (Throwable t)
+ {
+ if (Log.isWarn())
+ Log.getLogger(FlexClient.FLEX_CLIENT_LOG_CATEGORY).warn("Failed to create custom outbound queue processor for FlexClient with id '" + flexClient.getId() + "'. Using MessageBroker's default queue processor.", t);
+ }
+ }
+ }
+ return processor;
+ }
+
+ private Class createClass(String className)
+ {
+ Class c = ClassUtil.createClass(className, FlexContext.getMessageBroker() == null ? null :
+ FlexContext.getMessageBroker().getClassLoader());
+
+ return c;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientNotSubscribedException.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/FlexClientNotSubscribedException.java b/core/src/flex/messaging/client/FlexClientNotSubscribedException.java
new file mode 100644
index 0000000..f5ffe18
--- /dev/null
+++ b/core/src/flex/messaging/client/FlexClientNotSubscribedException.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import flex.messaging.MessageException;
+import flex.messaging.log.LogEvent;
+
+/**
+ *
+ */
+public class FlexClientNotSubscribedException extends MessageException
+{
+ /**
+ *
+ */
+ private static final long serialVersionUID = 773524927178340950L;
+
+ //--------------------------------------------------------------------------
+ //
+ // Properties
+ //
+ //--------------------------------------------------------------------------
+
+ //----------------------------------
+ // defaultLogMessageIntro
+ //----------------------------------
+
+ /**
+ * Overrides the intro text for the log message.
+ */
+ public String getDefaultLogMessageIntro()
+ {
+ return "FlexClient not subscribed: ";
+ }
+
+ //----------------------------------
+ // logStackTraceEnabled
+ //----------------------------------
+
+ /**
+ * Override to disable stack trace logging.
+ */
+ public boolean isLogStackTraceEnabled()
+ {
+ return false;
+ }
+
+ //----------------------------------
+ // peferredLogLevel
+ //----------------------------------
+
+ /**
+ * Override to lower the preferred log level to debug.
+ */
+ public short getPreferredLogLevel()
+ {
+ return LogEvent.DEBUG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java b/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java
new file mode 100644
index 0000000..19cf19e
--- /dev/null
+++ b/core/src/flex/messaging/client/FlexClientOutboundQueueProcessor.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import flex.messaging.Destination;
+import flex.messaging.MessageClient;
+import flex.messaging.MessageDestination;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.messages.Message;
+import flex.messaging.services.messaging.ThrottleManager;
+import flex.messaging.services.messaging.ThrottleManager.ThrottleResult;
+import flex.messaging.services.messaging.ThrottleManager.ThrottleResult.Result;
+
+/**
+ * The base FlexClientOutboundQueueProcessor implementation used if a custom implementation is not
+ * specified. Its behavior is very simple. It adds all new messages in order to the tail
+ * of the outbound queue and flushes all queued messages to the network as quickly as possible.
+ * It also handles the outbound client-level throttling specified at the destination level.
+ */
+public class FlexClientOutboundQueueProcessor
+{
+ //--------------------------------------------------------------------------
+ //
+ // Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * The associated FlexClient.
+ */
+ private FlexClient client;
+
+ /**
+ * The last MessageClient messages were flushed to. This is mainly for faster
+ * lookup.
+ */
+ private MessageClient lastMessageClient;
+
+ /**
+ * The associated endpoint's Id.
+ */
+ private String endpointId;
+
+ /**
+ * Manages throttling of outbound client level messages.
+ */
+ protected OutboundQueueThrottleManager outboundQueueThrottleManager;
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ *
+ * Stores the Id for the outbound queue's endpoint.
+ *
+ * @param value The Id for the outbound queue's endpoint.
+ */
+ public void setEndpointId(String value)
+ {
+ endpointId = value;
+ }
+
+ /**
+ * Returns the Id for the outbound queue's endpoint.
+ *
+ * @return The Id for the outbound queue's endpoint.
+ */
+ public String getEndpointId()
+ {
+ return endpointId;
+ }
+
+ /**
+ *
+ * Sets the associated FlexClient.
+ *
+ * @param value The associated FlexClient.
+ */
+ public void setFlexClient(FlexClient value)
+ {
+ client = value;
+ }
+
+ /**
+ * Returns the associated FlexClient.
+ *
+ * @return The associated FlexClient.
+ */
+ public FlexClient getFlexClient()
+ {
+ return client;
+ }
+
+ /**
+ * Returns the outbound queue throttle manager, or null if one does not exist.
+ *
+ * @return The outbound queue throttle manager.
+ */
+ public OutboundQueueThrottleManager getOutboundQueueThrottleManager()
+ {
+ return outboundQueueThrottleManager;
+ }
+
+ /**
+ * Utility method to initialize (if necessary) and return an outbound queue
+ * throttle manager.
+ *
+ * @return The outbound queue throttle manager.
+ */
+ public OutboundQueueThrottleManager getOrCreateOutboundQueueThrottleManager()
+ {
+ if (outboundQueueThrottleManager == null)
+ outboundQueueThrottleManager = new OutboundQueueThrottleManager(this);
+ return outboundQueueThrottleManager;
+ }
+
+ /**
+ * No-op; this default implementation doesn't require custom initialization.
+ * Subclasses may override to process any custom initialization properties that have been
+ * defined in the server configuration.
+ *
+ * @param properties A ConfigMap containing any custom initialization properties.
+ */
+ public void initialize(ConfigMap properties) {}
+
+ /**
+ * Always adds a new message to the tail of the queue.
+ *
+ * @param outboundQueue The queue of outbound messages.
+ * @param message The new message to add to the queue.
+ */
+ public void add(List<Message> outboundQueue, Message message)
+ {
+ outboundQueue.add(message);
+ }
+
+ /**
+ * Always empties the queue and returns all messages to be sent to the client.
+ *
+ * @param outboundQueue The queue of outbound messages.
+ * @return A FlushResult containing the messages that have been removed from the outbound queue
+ * to be written to the network and a wait time for the next flush of the outbound queue
+ * that is the default for the underlying Channel/Endpoint.
+ */
+ public FlushResult flush(List<Message> outboundQueue)
+ {
+ return flush(null /* no client distinction */, outboundQueue);
+ }
+
+ /**
+ * Removes all messages in the queue targeted to this specific MessageClient subscription(s) and
+ * returns them to be sent to the client.
+ * Overrides should be careful to only return messages for the specified MessageClient.
+ *
+ * @param messageClient The specific MessageClient to return messages for.
+ * @param outboundQueue The queue of outbound messages.
+ * @return A FlushResult containing the messages that have been removed from the outbound queue
+ * to be written to the network for this MessageClient.
+ */
+ public FlushResult flush(MessageClient messageClient, List<Message> outboundQueue)
+ {
+ FlushResult flushResult = new FlushResult();
+ List<Message> messagesToFlush = null;
+
+ for (Iterator<Message> iter = outboundQueue.iterator(); iter.hasNext();)
+ {
+ Message message = iter.next();
+ if (messageClient == null || (message.getClientId().equals(messageClient.getClientId())))
+ {
+ if (isMessageExpired(message)) // Don't flush expired messages.
+ {
+ iter.remove();
+ continue;
+ }
+
+ messageClient = messageClient == null? getMessageClient(message) : messageClient;
+
+ // First, apply the destination level outbound throttling.
+ ThrottleResult throttleResult = throttleOutgoingDestinationLevel(messageClient, message, false);
+ Result result = throttleResult.getResult();
+
+ // No destination level throttling; check destination-client level throttling.
+ if (Result.OK == result)
+ {
+ throttleResult = throttleOutgoingClientLevel(messageClient, message, false);
+ result = throttleResult.getResult();
+ // If no throttling, simply add the message to the list.
+ if (Result.OK == result)
+ {
+ updateMessageFrequencyOutgoing(messageClient, message);
+ if (messagesToFlush == null)
+ messagesToFlush = new ArrayList<Message>();
+ messagesToFlush.add(message);
+ }
+ // In rest of the policies (which is NONE), simply don't
+ // add the message to the list.
+ }
+ iter.remove();
+ }
+ }
+
+ flushResult.setMessages(messagesToFlush);
+ return flushResult;
+ }
+
+ /**
+ * Utility method to test whether a message has expired or not.
+ * Messages with a timeToLive value that is shorter than the timespan from the message's
+ * timestamp up to the current system time will cause this method to return true.
+ * If there are expired messages in the outbound queue, flush implementations
+ * should use this helper method to only process and return messages that have
+ * not yet expired.
+ *
+ * @param message The message to test for expiration.
+ *
+ * @return true if the message has a timeToLive value that has expired; otherwise false.
+ */
+ public boolean isMessageExpired(Message message)
+ {
+ return (message.getTimeToLive() > 0 && (System.currentTimeMillis() - message.getTimestamp()) >= message.getTimeToLive());
+ }
+
+ /**
+ * Attempts to throttle the outgoing message at the destination level.
+ *
+ * @param msgClient The client the message is intended for.
+ * @param message The message to consider to throttle.
+ * @param buffered Whether the message has already been buffered. In that case,
+ * parts of regular throttling code is skipped.
+ * @return The result of throttling attempt.
+ */
+ protected ThrottleResult throttleOutgoingDestinationLevel(MessageClient msgClient, Message message, boolean buffered)
+ {
+ ThrottleManager throttleManager = getThrottleManager(msgClient);
+ if (throttleManager != null)
+ {
+ // In already buffered messages, don't use ThrottleManager#throttleOutgoingMessage
+ // to avoid regular throttling handling as the message has already been buffered.
+ if (buffered)
+ return throttleManager.throttleDestinationLevel(message, false /*incoming*/);
+
+ // Otherwise, regular throttling.
+ return throttleManager.throttleOutgoingMessage(message);
+ }
+ return new ThrottleResult(); // Otherwise, return OK result.
+ }
+
+ /**
+ * Attempts to throttle the outgoing message at the destination-client level.
+ *
+ * @param msgClient The client the message is intended for.
+ * @param message The message to consider to throttle.
+ * @param buffered Whether the message has already been buffered. In that case,
+ * parts of regular throttling code is skipped.
+ * @return The result of throttling attempt.
+ */
+ protected ThrottleResult throttleOutgoingClientLevel(MessageClient msgClient, Message message, boolean buffered)
+ {
+ if (outboundQueueThrottleManager != null) // Means client level throttling enabled.
+ {
+ ThrottleResult throttleResult = outboundQueueThrottleManager.throttleOutgoingClientLevel(message);
+ if (!buffered)
+ {
+ ThrottleManager throttleManager = getThrottleManager(msgClient);
+ if (throttleManager != null)
+ throttleManager.handleOutgoingThrottleResult(message, throttleResult, true /*isClientLevel*/);
+ }
+ return throttleResult;
+ }
+ return new ThrottleResult(); // Otherwise, return OK result.
+ }
+
+ /**
+ * Returns the message client that the message is intended to.
+ *
+ * @param message The message.
+ * @return The message client that the message is intended to.
+ */
+ protected MessageClient getMessageClient(Message message)
+ {
+ // First try using the cached message client.
+ if (lastMessageClient != null && message.getClientId().equals(lastMessageClient.getClientId()))
+ {
+ return lastMessageClient;
+ }
+ else // Go ahead with the lookup.
+ {
+ lastMessageClient = client.getMessageClient((String)message.getClientId());
+ return lastMessageClient;
+ }
+ }
+
+ /**
+ * Returns the throttle manager associated with the destination the message
+ * is intended to.
+ *
+ * @param msgClient The message client; it can be null.
+ * @return The throttle manager.
+ */
+ protected ThrottleManager getThrottleManager(MessageClient msgClient)
+ {
+ Destination destination = msgClient != null? msgClient.getDestination() : null;
+ return (destination != null && destination instanceof MessageDestination)?
+ ((MessageDestination)destination).getThrottleManager() : null;
+ }
+
+ /**
+ * Updates the outgoing message's message frequency.
+ *
+ * @param msgClient The MessageClient that might have been passed to the flush; it can be null.
+ * @param message The message.
+ */
+ protected void updateMessageFrequencyOutgoing(MessageClient msgClient, Message message)
+ {
+ // Update the destination level message frequency.
+ ThrottleManager throttleManager = getThrottleManager(msgClient);
+ if (throttleManager != null)
+ throttleManager.updateMessageFrequencyDestinationLevel(false /*incoming*/);
+
+ // Update the client level message frequency.
+ if (outboundQueueThrottleManager != null)
+ outboundQueueThrottleManager.updateMessageFrequencyOutgoingClientLevel(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/FlushResult.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/FlushResult.java b/core/src/flex/messaging/client/FlushResult.java
new file mode 100644
index 0000000..ab400fa
--- /dev/null
+++ b/core/src/flex/messaging/client/FlushResult.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import java.util.List;
+
+import flex.messaging.messages.Message;
+
+/**
+ * Stores the messages that should be written to the network as a result of a flush
+ * invocation on a FlexClient's outbound queue.
+ */
+public class FlushResult
+{
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Constructs a <tt>FlushResult</tt> instance to return from a
+ * flush invocation on a FlexClient's outbound queue.
+ * This instance stores the list of messages to write over the network to
+ * the client as well as an optional wait time in milliseconds for when the
+ * next flush should be invoked.
+ */
+ public FlushResult() {}
+
+ //--------------------------------------------------------------------------
+ //
+ // Properties
+ //
+ //--------------------------------------------------------------------------
+
+ //----------------------------------
+ // messages
+ //----------------------------------
+
+ private List<Message> messages;
+
+ /**
+ * Returns the messages to write to the network for this flush invocation.
+ * This list may be null, in which case no messages are written.
+ *
+ * @return The messages to write to the network for this flush invocation.
+ */
+ public List<Message> getMessages()
+ {
+ return messages;
+ }
+
+ /**
+ * Sets the messages to write to the network for this flush invocation.
+ *
+ * @param value The messages to write to the network for this flush invocation.
+ */
+ public void setMessages(List<Message> value)
+ {
+ messages = value;
+ }
+
+ //----------------------------------
+ // nextFlushWaitTimeMillis
+ //----------------------------------
+
+ private int nextFlushWaitTimeMillis = 0;
+
+ /**
+ * Returns the wait time in milliseconds for when the next flush invocation should occur.
+ * If this value is 0, the default, a delayed flush is not scheduled and the next flush will
+ * depend upon the underlying Channel/Endpoint.
+ * For client-side polling Channels the next flush invocation will happen when the client sends
+ * its next poll request at its regular interval.
+ * For client-side Channels that support direct writes to the client a flush invocation is triggered
+ * when the next message is added to the outbound queue.
+ *
+ * @return The wait time in milliseconds before flush is next invoked. A value of 0, the default,
+ * indicates that the default flush behavior for the underlying Channel/Endpoint should be
+ * used.
+ */
+ public int getNextFlushWaitTimeMillis()
+ {
+ return nextFlushWaitTimeMillis;
+ }
+
+ /**
+ * Sets the wait time in milliseconds for when the next flush invocation should occur.
+ * If this value is 0, the default, a delayed flush is not scheduled and the next flush will
+ * depend upon the underlying Channel/Endpoint.
+ * For client-side polling Channels the next flush invocation will happen when the client sends
+ * its next poll request at its regular interval.
+ * For client-side Channels that support direct writes to the client a flush invocation is triggered
+ * when the next message is added to the outbound queue.
+ * Negative value assignments are treated as 0.
+ *
+ * @param value The wait time in milliseconds before flush will be invoked.
+ */
+ public void setNextFlushWaitTimeMillis(int value)
+ {
+ nextFlushWaitTimeMillis = (value < 1) ? 0 : value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/OutboundQueueThrottleManager.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/OutboundQueueThrottleManager.java b/core/src/flex/messaging/client/OutboundQueueThrottleManager.java
new file mode 100644
index 0000000..24a66d0
--- /dev/null
+++ b/core/src/flex/messaging/client/OutboundQueueThrottleManager.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import flex.messaging.MessageClient.SubscriptionInfo;
+import flex.messaging.config.ThrottleSettings.Policy;
+import flex.messaging.log.Log;
+import flex.messaging.messages.Message;
+import flex.messaging.services.messaging.MessageFrequency;
+import flex.messaging.services.messaging.ThrottleManager;
+import flex.messaging.services.messaging.ThrottleManager.ThrottleResult;
+import flex.messaging.util.StringUtils;
+
+
+/**
+ * Used to keep track of and limit outbound message rates of a single FlexClient queue.
+ * An outbound FlexClient queue can contain messages from multiple MessageClients
+ * across multiple destinations. It can also contain messages for multiple
+ * subscriptions (for each subtopic/selector) across the same destination for
+ * the same MessageClient.
+ */
+public class OutboundQueueThrottleManager
+{
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Constructs a default outbound queue throttle manager.
+ *
+ * @param processor The outbound queue processor that is using this throttle manager.
+ */
+ public OutboundQueueThrottleManager(FlexClientOutboundQueueProcessor processor)
+ {
+ destinationFrequencies = new ConcurrentHashMap<String, DestinationFrequency>();
+ this.processor = processor;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Map of destination id and destination message frequencies.
+ */
+ protected final ConcurrentHashMap<String, DestinationFrequency> destinationFrequencies;
+
+ /**
+ * The parent queue processor of the throttle manager.
+ */
+ protected final FlexClientOutboundQueueProcessor processor;
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Registers the destination with the outbound throttle manager.
+ *
+ * @param destinationId The id of the destination.
+ * @param outboundMaxClientFrequency The outbound max-client-frequency specified
+ * at the destination.
+ * @param outboundPolicy The outbound throttle policy specified at the destination.
+ */
+ public void registerDestination(String destinationId, int outboundMaxClientFrequency, Policy outboundPolicy)
+ {
+ DestinationFrequency frequency = destinationFrequencies.get(destinationId);
+ if (frequency == null)
+ {
+ frequency = new DestinationFrequency(outboundMaxClientFrequency, outboundPolicy);
+ destinationFrequencies.putIfAbsent(destinationId, frequency);
+ }
+ }
+
+ /**
+ * Registers the subscription of a client talking to a destination with the
+ * specified subscription info.
+ *
+ * @param destinationId The destination id.
+ * @param si The subscription information.
+ */
+ public void registerSubscription(String destinationId, SubscriptionInfo si)
+ {
+ DestinationFrequency frequency = destinationFrequencies.get(destinationId);
+ frequency.logMaxFrequencyDuringRegistration(frequency.outboundMaxClientFrequency, si);
+ }
+
+ /**
+ * Unregisters the subscription.
+ *
+ * @param destinationId The destination id.
+ * @param si The subscription information.
+ */
+ public void unregisterSubscription(String destinationId, SubscriptionInfo si)
+ {
+ unregisterDestination(destinationId);
+ }
+
+ /**
+ * Unregisters all subscriptions of the client under the specified destination.
+ *
+ * @param destinationId The destination id.
+ */
+ public void unregisterAllSubscriptions(String destinationId)
+ {
+ unregisterDestination(destinationId);
+ }
+
+ /**
+ * Attempts to throttle the outgoing message.
+ *
+ * @param message The message to consider to throttle.
+ * @return True if the message was throttled; otherwise false.
+ */
+ public ThrottleResult throttleOutgoingClientLevel(Message message)
+ {
+ String destinationId = message.getDestination();
+ if (isDestinationRegistered(destinationId))
+ {
+ DestinationFrequency frequency = destinationFrequencies.get(message.getDestination());
+ int maxFrequency = frequency.getMaxFrequency(message); // Limit to check against.
+ MessageFrequency messageFrequency = frequency.getMessageFrequency(message); // Message rate of the client.
+ if (messageFrequency != null)
+ {
+ ThrottleResult result = messageFrequency.checkLimit(maxFrequency, frequency.outboundPolicy);
+ return result;
+ }
+ }
+ return new ThrottleResult(); // Otherwise, return OK result.
+ }
+
+ /**
+ * Updates the outgoing client level message frequency of the message.
+ *
+ * @param message The message.
+ */
+ public void updateMessageFrequencyOutgoingClientLevel(Message message)
+ {
+ String destinationId = message.getDestination();
+ if (isDestinationRegistered(destinationId))
+ {
+ DestinationFrequency frequency = destinationFrequencies.get(message.getDestination());
+ MessageFrequency messageFrequency = frequency.getMessageFrequency(message);
+ if (messageFrequency != null)
+ messageFrequency.updateMessageFrequency();
+ }
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Protected Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Determines whether the destination has been registered or not.
+ *
+ * @param destinationId The destination id.
+ * @return True if the destination with the specified id has been registered.
+ */
+ protected boolean isDestinationRegistered(String destinationId)
+ {
+ return destinationFrequencies.containsKey(destinationId);
+ }
+
+ /**
+ * Unregisters the destination from the outbound throttle manager.
+ *
+ * @param destinationId The id of the destination.
+ */
+ protected void unregisterDestination(String destinationId)
+ {
+ if (isDestinationRegistered(destinationId))
+ destinationFrequencies.remove(destinationId);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Inner Classes
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Used to keep track of max-client-frequency and outgoing throttle policy
+ * specified at the destination. It also keeps track of outbound message
+ * rates of all MessageClient subscriptions across the destination.
+ */
+ class DestinationFrequency
+ {
+ protected final int outboundMaxClientFrequency; // destination specified client limit.
+ protected final MessageFrequency outboundClientFrequency;
+ protected final Policy outboundPolicy; // destination specified policy.
+
+ /**
+ * Default constructor.
+ *
+ * @param outboundMaxClientFrequency The outbound throttling max-client-frequency of the destination.
+ * @param outboundPolicy The outbound throttling policy of the destination.
+ */
+ DestinationFrequency(int outboundMaxClientFrequency, Policy outboundPolicy)
+ {
+ this.outboundMaxClientFrequency = outboundMaxClientFrequency;
+ this.outboundPolicy = outboundPolicy;
+ outboundClientFrequency = new MessageFrequency(outboundMaxClientFrequency);
+ }
+
+ /**
+ * Returns the max-client-frequency for the subscription the message is
+ * intended for (which is simply the max-client-frequency specified at
+ * the destination).
+ *
+ * @param message The message.
+ *
+ * @return The max-frequency for the subscription.
+ */
+ int getMaxFrequency(Message message)
+ {
+ return outboundMaxClientFrequency;
+ }
+
+ /**
+ * Given a subscription the message is intended to, returns the message
+ * rate frequency for that subscription.
+ *
+ * @param message The message.
+ * @return The message frequency for the subscription, if it exists; otherwise null.
+ */
+ MessageFrequency getMessageFrequency(Message message)
+ {
+ return outboundClientFrequency;
+ }
+
+ /**
+ * Utility function to log the maxFrequency being used during subscription.
+ *
+ * @param maxFrequency The maxFrequency to log.
+ */
+ void logMaxFrequencyDuringRegistration(int maxFrequency, SubscriptionInfo si)
+ {
+ if (Log.isDebug())
+ Log.getLogger(ThrottleManager.LOG_CATEGORY).debug("Outbound queue throttle manager for FlexClient '"
+ + processor.getFlexClient().getId() + "' is using '" + maxFrequency
+ + "' as the throttling limit for its subscription: "
+ + StringUtils.NEWLINE + si);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/PollFlushResult.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/PollFlushResult.java b/core/src/flex/messaging/client/PollFlushResult.java
new file mode 100644
index 0000000..dc8e33b
--- /dev/null
+++ b/core/src/flex/messaging/client/PollFlushResult.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Extends <tt>FlushResult</tt> and adds additional properties for controlling
+ * client polling behavior.
+ */
+public class PollFlushResult extends FlushResult
+{
+ //--------------------------------------------------------------------------
+ //
+ // Properties
+ //
+ //--------------------------------------------------------------------------
+
+ //----------------------------------
+ // avoidBusyPolling
+ //----------------------------------
+
+ private boolean avoidBusyPolling;
+
+ /**
+ * Indicates whether the handling of this result should attempt to avoid
+ * potential busy-polling cycles.
+ * This will be set to <code>true</code> in the case of two clients that are both
+ * long-polling the server over the same session.
+ *
+ * @return <code>true</code> if the handling of this result should attempt to avoid potential
+ * busy-polling cycles.
+ */
+ public boolean isAvoidBusyPolling()
+ {
+ return avoidBusyPolling;
+ }
+
+ /**
+ * Set to <code>true</code> to signal that handling for this result should attempt to avoid
+ * potential busy-polling cycles.
+ *
+ * @param value <code>true</code> to signal that handling for this result should attempt to
+ * avoid potential busy-polling cycles.
+ */
+ public void setAvoidBusyPolling(boolean value)
+ {
+ avoidBusyPolling = value;
+ }
+
+ //----------------------------------
+ // clientProcessingSuppressed
+ //----------------------------------
+
+ private boolean clientProcessingSuppressed;
+
+ /**
+ * Indicates whether client processing of this result should be
+ * suppressed.
+ * This should be <code>true</code> for results generated for poll requests
+ * that arrive while a long-poll request from the same client is being serviced
+ * to avoid a busy polling cycle.
+ *
+ * @return <code>true</code> if client processing of this result should be suppressed;
+ * otherwise <code>false</code>.
+ */
+ public boolean isClientProcessingSuppressed()
+ {
+ return clientProcessingSuppressed;
+ }
+
+ /**
+ * Set to <code>true</code> to suppress client processing of this result.
+ * Default is <code>false</code>.
+ * This should be set to <code>true</code> for results generated for poll requests
+ * that arrive while a long-poll request from the same client is being serviced
+ * to avoid a busy polling cycle.
+ *
+ * @param value <code>true</code> to suppress client processing of the result.
+ */
+ public void setClientProcessingSuppressed(boolean value)
+ {
+ clientProcessingSuppressed = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/PollWaitListener.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/PollWaitListener.java b/core/src/flex/messaging/client/PollWaitListener.java
new file mode 100644
index 0000000..7978e6f
--- /dev/null
+++ b/core/src/flex/messaging/client/PollWaitListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * Used alongside invocations of <code>FlexClient.pollWithWait()</code> to allow calling code to
+ * maintain a record of the Objects being used to place waited poll requests into a wait
+ * state. This can be used to break the threads out of their wait state separately from the
+ * internal waited poll handling within <code>FlexClient</code>.
+ */
+public interface PollWaitListener
+{
+ /**
+ * Hook method invoked directly before a wait begins.
+ *
+ * @param notifier The <tt>Object</tt> being used to <code>wait()/notify()</code>.
+ */
+ void waitStart(Object notifier);
+
+ /**
+ * Hook method invoked directly after a wait completes.
+ *
+ * @param notifier The <tt>Object</tt> being used to <code>wait()/notify()</code>.
+ */
+ void waitEnd(Object notifier);
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/UserAgentSettings.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/UserAgentSettings.java b/core/src/flex/messaging/client/UserAgentSettings.java
new file mode 100644
index 0000000..217f2c7
--- /dev/null
+++ b/core/src/flex/messaging/client/UserAgentSettings.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.client;
+
+/**
+ * A class to hold user agent specific properties. For example, in streaming
+ * endpoints, a certain number of bytes need to be written before the
+ * streaming connection can be used and this value is specific to user agents.
+ * Similarly, the number of simultaneous connections a session can have is user
+ * agent specific.
+ */
+public class UserAgentSettings
+{
+ /**
+ * The prefixes of the version token used by various browsers.
+ */
+ public static final String USER_AGENT_ANDROID = "Android";
+ public static final String USER_AGENT_CHROME = "Chrome";
+ public static final String USER_AGENT_FIREFOX = "Firefox";
+ public static final String USER_AGENT_FIREFOX_1 = "Firefox/1";
+ public static final String USER_AGENT_FIREFOX_2 = "Firefox/2";
+ public static final String USER_AGENT_MSIE = "MSIE";
+ public static final String USER_AGENT_MSIE_5 = "MSIE 5";
+ public static final String USER_AGENT_MSIE_6 = "MSIE 6";
+ public static final String USER_AGENT_MSIE_7 = "MSIE 7";
+ public static final String USER_AGENT_OPERA = "Opera";
+ public static final String USER_AGENT_OPERA_8 = "Opera 8";
+ // Opera 10,11 ship as User Agent Opera/9.8.
+ public static final String USER_AGENT_OPERA_10 = "Opera/9.8";
+ public static final String USER_AGENT_SAFARI = "Safari";
+
+ /**
+ * Bytes needed to kickstart the streaming connections for IE.
+ */
+ public static final int KICKSTART_BYTES_MSIE = 2048;
+ /**
+ * Bytes needed to kickstart the streaming connections for SAFARI.
+ */
+ public static final int KICKSTART_BYTES_SAFARI = 512;
+ /**
+ * Bytes needs to kicksart the streaming connections for Android.
+ */
+ public static final int KICKSTART_BYTES_ANDROID = 4010;
+
+ /**
+ * The default number of persistent connections per session for various browsers.
+ */
+ private static final int MAX_PERSISTENT_CONNECTIONS_LEGACY = 1;
+ public static final int MAX_PERSISTENT_CONNECTIONS_DEFAULT = 5;
+ private static final int MAX_PERSISTENT_CONNECTIONS_OPERA_LEGACY = 3;
+ private static final int MAX_PERSISTENT_CONNECTIONS_CHROME = MAX_PERSISTENT_CONNECTIONS_DEFAULT;
+ private static final int MAX_PERSISTENT_CONNECTIONS_FIREFOX = MAX_PERSISTENT_CONNECTIONS_DEFAULT;
+ private static final int MAX_PERSISTENT_CONNECTIONS_MSIE = MAX_PERSISTENT_CONNECTIONS_DEFAULT;
+ private static final int MAX_PERSISTENT_CONNECTIONS_OPERA = 7;
+ private static final int MAX_PERSISTENT_CONNECTIONS_SAFARI = 3;
+
+ private String matchOn;
+ private int kickstartBytes;
+ private int maxPersistentConnectionsPerSession = MAX_PERSISTENT_CONNECTIONS_DEFAULT;
+
+ /**
+ * Static method to retrieve pre-initialized user agents which are as follows:
+ *
+ * In Chrome 0, 1, 2, the limit is 6:
+ * match-on="Chrome" max-persistent-connections-per-session="5"
+ *
+ * In Firefox 1, 2, the limit is 2:
+ * match-on="Firefox" max-persistent-connections-per-session="1"
+ *
+ * In Firefox 3, the limit is 6:
+ * match-on="Firefox/3" max-persistent-connections-per-session="5"
+ *
+ * In MSIE 5, 6, 7, the limit is 2 with kickstart bytes of 2K:
+ * match-on="MSIE" max-persistent-connections-per-session="1" kickstart-bytes="2048"
+ *
+ * In MSIE 8, the limit is 6 with kickstart bytes of 2K:
+ * match-on="MSIE 8" max-persistent-connections-per-session="5" kickstart-bytes="2048"
+ *
+ * In Opera 7, 9, the limit is 4:
+ * match-on="Opera" max-persistent-connections-per-session="3"
+ *
+ * In Opera 8, the limit is 8:
+ * match-on="Opera 8" max-persistent-connections-per-session="7"
+ *
+ * In Opera 10, the limit is 8.
+ * match-on="Opera 10" max-persistent-connections-per-session="7"
+ *
+ * In Safari 3, 4, the limit is 4.
+ * match-on="Safari" max-persistent-connections-per-session="3"
+ *
+ * @param matchOn String to use match the agent.
+ */
+ public static UserAgentSettings getAgent(String matchOn)
+ {
+ UserAgentSettings userAgent = new UserAgentSettings();
+ userAgent.setMatchOn(matchOn);
+
+ if (USER_AGENT_ANDROID.equals(matchOn))
+ {
+ userAgent.setKickstartBytes(KICKSTART_BYTES_ANDROID);
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_SAFARI);
+ }
+ if (USER_AGENT_CHROME.equals(matchOn))
+ {
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_CHROME);
+ }
+ else if (USER_AGENT_FIREFOX.equals(matchOn))
+ {
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_FIREFOX);
+ }
+ else if (USER_AGENT_FIREFOX_1.equals(matchOn))
+ {
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+ }
+ else if (USER_AGENT_FIREFOX_2.equals(matchOn))
+ {
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+ }
+ else if (USER_AGENT_MSIE.equals(matchOn))
+ {
+ userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE);
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_MSIE);
+ }
+ else if (USER_AGENT_MSIE_5.equals(matchOn))
+ {
+ userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE);
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+ }
+ else if (USER_AGENT_MSIE_6.equals(matchOn))
+ {
+ userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE);
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+ }
+ else if (USER_AGENT_MSIE_7.equals(matchOn))
+ {
+ userAgent.setKickstartBytes(KICKSTART_BYTES_MSIE);
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_LEGACY);
+ }
+ else if (USER_AGENT_OPERA.equals(matchOn))
+ {
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA_LEGACY);
+ }
+ else if (USER_AGENT_OPERA_8.equals(matchOn))
+ {
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA);
+ }
+ else if (USER_AGENT_OPERA_10.equals(matchOn))
+ {
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_OPERA);
+ }
+ else if (USER_AGENT_SAFARI.equals(matchOn))
+ {
+ userAgent.setKickstartBytes(KICKSTART_BYTES_SAFARI);
+ userAgent.setMaxPersistentConnectionsPerSession(MAX_PERSISTENT_CONNECTIONS_SAFARI);
+ }
+ return userAgent;
+ }
+
+ /**
+ * Returns the String to use to match the agent.
+ *
+ * @return The String to use to match the agent.
+ */
+ public String getMatchOn()
+ {
+ return matchOn;
+ }
+
+ /**
+ * Sets the String to use to match the agent.
+ *
+ * @param matchOn The String to use to match the agent.
+ */
+ public void setMatchOn(String matchOn)
+ {
+ this.matchOn = matchOn;
+ }
+
+ /**
+ * Returns the number of bytes needed to kickstart the streaming connections
+ * for the user agent.
+ *
+ * @return The number of bytes needed to kickstart the streaming connections
+ * for the user agent.
+ */
+ public int getKickstartBytes()
+ {
+ return kickstartBytes;
+ }
+
+ /**
+ * Sets the number of bytes needed to kickstart the streaming connections
+ * for the user agent.
+ *
+ * @param kickstartBytes The number of bytes needed to kickstart the streaming
+ * connections for the user agent.
+ */
+ public void setKickstartBytes(int kickstartBytes)
+ {
+ if (kickstartBytes < 0)
+ kickstartBytes = 0;
+ this.kickstartBytes = kickstartBytes;
+ }
+
+ /**
+ * @deprecated Use {@link UserAgentSettings#getMaxPersistentConnectionsPerSession()} instead.
+ */
+ public int getMaxStreamingConnectionsPerSession()
+ {
+ return getMaxPersistentConnectionsPerSession();
+ }
+
+ /**
+ * @deprecated Use {@link UserAgentSettings#setMaxPersistentConnectionsPerSession(int)} instead.
+ */
+ public void setMaxStreamingConnectionsPerSession(int maxStreamingConnectionsPerSession)
+ {
+ setMaxPersistentConnectionsPerSession(maxStreamingConnectionsPerSession);
+ }
+
+ /**
+ * Returns the number of simultaneous streaming connections per session
+ * the user agent supports.
+ *
+ * @return The number of streaming connections per session the user agent supports.
+ */
+ public int getMaxPersistentConnectionsPerSession()
+ {
+ return maxPersistentConnectionsPerSession;
+ }
+
+ /**
+ * Sets the number of simultaneous streaming connections per session
+ * the user agent supports.
+ *
+ * @param maxStreamingConnectionsPerSession The number of simultaneous
+ * streaming connections per session the user agent supports.
+ */
+ public void setMaxPersistentConnectionsPerSession(int maxStreamingConnectionsPerSession)
+ {
+ this.maxPersistentConnectionsPerSession = maxStreamingConnectionsPerSession;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/client/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/client/package-info.java b/core/src/flex/messaging/client/package-info.java
new file mode 100644
index 0000000..97d5848
--- /dev/null
+++ b/core/src/flex/messaging/client/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flex.messaging.client;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/cluster/BroadcastHandler.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/cluster/BroadcastHandler.java b/core/src/flex/messaging/cluster/BroadcastHandler.java
new file mode 100644
index 0000000..f036622
--- /dev/null
+++ b/core/src/flex/messaging/cluster/BroadcastHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.cluster;
+
+import java.util.List;
+
+/**
+ *
+ * This interface represents a handler for a message broadcast by a Cluster.
+ * Clusters broadcast messages across their physical nodes, and when they
+ * receive those messages they locate a BroadcastHandler capable of handling
+ * the broadcast.
+ */
+public interface BroadcastHandler
+{
+ /**
+ * Handle the broadcast message.
+ *
+ * @param sender sender of the original message
+ * @param params any parameters need to handle the message
+ */
+ void handleBroadcast(Object sender, List<Object> params);
+
+ /**
+ * Determine whether this Handler supports a particular operation by name.
+ *
+ * @return whether or not this handler supports the named operation
+ * @param name name of the operation
+ */
+ boolean isSupportedOperation(String name);
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/cluster/Cluster.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/cluster/Cluster.java b/core/src/flex/messaging/cluster/Cluster.java
new file mode 100644
index 0000000..0339c55
--- /dev/null
+++ b/core/src/flex/messaging/cluster/Cluster.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.cluster;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.w3c.dom.Element;
+
+import flex.messaging.config.ConfigMap;
+import flex.messaging.log.LogCategories;
+
+/**
+ *
+ * Base interface for cluster implementations.
+ */
+public abstract class Cluster
+{
+ /**
+ * Default log category for clustering.
+ */
+ public static final String LOG_CATEGORY = LogCategories.SERVICE_CLUSTER;
+
+ /**
+ * Listeners to be notified when a node is removed from the cluster.
+ */
+ List removeNodeListeners = Collections.synchronizedList(new ArrayList());
+
+ /**
+ * Cluster properties file.
+ */
+ Element clusterPropertiesFile;
+
+ /**
+ * Specifies whether or not this is the default cluster.
+ */
+ boolean def;
+
+ /**
+ * Specifies if this cluster is enabled for URL load balancing.
+ */
+ boolean urlLoadBalancing;
+
+ /**
+ * Because destinations are the constructs which become clustered, clusters
+ * are identified by a unique name composed in the format
+ * "serviceType:destinationId".
+ *
+ * @return The unique name for the clustered destination.
+ * @param serviceType The name of the service for this destination.
+ * @param destinationName The original name of the destination.
+ */
+ static String getClusterDestinationKey(String serviceType, String destinationName)
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append(serviceType);
+ sb.append(':');
+ sb.append(destinationName);
+ return sb.toString();
+ }
+
+ /**
+ * Add a listener for remove cluster node notification.
+ *
+ * @param listener the RemoveNodeListener to add
+ */
+ public void addRemoveNodeListener(RemoveNodeListener listener)
+ {
+ removeNodeListeners.add(listener);
+ }
+
+ /**
+ * Send notification to remove node listeners that a node has
+ * been removed from the cluster.
+ *
+ * @param address The node that was removed from the cluster.
+ */
+ protected void sendRemoveNodeListener(Object address)
+ {
+ synchronized (removeNodeListeners)
+ {
+ for (int i = 0; i < removeNodeListeners.size(); i++)
+ ((RemoveNodeListener)removeNodeListeners.get(i)).removeClusterNode(address);
+ }
+ }
+
+ /**
+ * Initializes the Cluster with id and the map of properties. The default
+ * implementation is no-op.
+ *
+ * @param id The cluster id.
+ * @param properties The map of properties.
+ */
+ public void initialize(String id, ConfigMap properties)
+ {
+ // No-op.
+ }
+
+ /**
+ * Returns the cluster properties file.
+ *
+ * @return The cluster properties file.
+ */
+ public Element clusterPropertiesFile()
+ {
+ return clusterPropertiesFile;
+ }
+
+ /**
+ * Sets the cluster properties file.
+ *
+ * @param value The cluster properties file.
+ */
+ public void setClusterPropertiesFile(Element value)
+ {
+ this.clusterPropertiesFile = value;
+ }
+
+ /**
+ * Returns true if this is the default cluster for any destination that does not
+ * specify a clustered destination.
+ *
+ * @return Returns true if this is the default cluster.
+ */
+ public boolean isDefault()
+ {
+ return def;
+ }
+
+ /**
+ * When true, this is the default cluster for any destination that does not
+ * specify a clustered destination.
+ *
+ * @param d true if this is the default cluster
+ */
+ public void setDefault(boolean d)
+ {
+ this.def = d;
+ }
+
+ /**
+ * When true, this cluster is enabled for URL load balancing.
+ *
+ * @return true if this cluster enabled for load balancing.
+ */
+ public boolean getURLLoadBalancing()
+ {
+ return urlLoadBalancing;
+ }
+
+ /**
+ * When true, the cluster is enabled for URL load balancing.
+ *
+ * @param u the flag to enable the URL load balancing
+ */
+ public void setURLLoadBalancing(boolean u)
+ {
+ urlLoadBalancing = u;
+ }
+
+ /**
+ * Shutdown the cluster.
+ */
+ public abstract void destroy();
+
+ /**
+ * Retrieve a List of Maps, where each Map contains channel id keys
+ * mapped to endpoint URLs for the given service type and destination name.
+ * There is exactly one endpoint URL for each
+ * channel id. This List represents all of the known endpoint URLs
+ * for all of the channels in the Cluster.
+ * @param serviceType the service type
+ * @param destName the destination name
+ * @return List of maps of channel ids to endpoint URLs for each node in
+ * the cluster.
+ */
+ public abstract List getAllEndpoints(String serviceType, String destName);
+
+ /**
+ * Returns a list of all of the nodes of this cluster.
+ * @return List a list of member IP addresses in the cluster
+ */
+ public abstract List getMemberAddresses();
+
+ /**
+ * Returns the local cluster node.
+ * @return Object the Local Address object
+ */
+ public abstract Object getLocalAddress();
+
+ /**
+ * Broadcast a service-related operation, which usually includes a Message as a method parameter. This method
+ * allows a local service to process a Message and then send the Message to the services on all peer nodes
+ * so that they may perform the same processing.
+ *
+ * @param serviceOperation The operation to broadcast.
+ * @param params Parameters for the operation.
+ */
+ public abstract void broadcastServiceOperation(String serviceOperation, Object[] params);
+
+ /**
+ * Send a service-related operation in point-to-point fashion to one and only one member of the cluster.
+ * This is similar to the broadcastServiceOperation except that this invocation is sent to the first
+ * node among the cluster members that does not have the local node's address.
+ *
+ * @param serviceOperation The operation to send.
+ * @param params Parameters for the operation.
+ * @param targetAddress the target address of a remote node in the cluster
+ */
+ public abstract void sendPointToPointServiceOperation(String serviceOperation, Object[] params, Object targetAddress);
+
+ /**
+ * Add a local endpoint URL for a local channel. After doing so, broadcast the information to
+ * peers so that they will be aware of the URL.
+ *
+ * @param serviceType the service type of the endpoint
+ * @param destName the destination name
+ * @param channelId the Channel ID
+ * @param endpointUrl the endpoint URL
+ * @param endpointPort the endpoint port
+ */
+ public abstract void addLocalEndpointForChannel(String serviceType, String destName,
+ String channelId, String endpointUrl, int endpointPort);
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/cluster/ClusterException.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/cluster/ClusterException.java b/core/src/flex/messaging/cluster/ClusterException.java
new file mode 100644
index 0000000..c0dc41d
--- /dev/null
+++ b/core/src/flex/messaging/cluster/ClusterException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.cluster;
+
+import flex.messaging.MessageException;
+
+/**
+ *
+ * Exception type for cluster errors.
+ */
+public class ClusterException extends MessageException
+{
+ /**
+ * Serializable version uid.
+ */
+ static final long serialVersionUID = 1948590697997522770L;
+}