You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by ah...@apache.org on 2014/04/25 07:34:04 UTC
[06/51] [partial] BlazeDS Donation from Adobe Systems Inc
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/MessageBrokerServlet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/MessageBrokerServlet.java b/modules/core/src/flex/messaging/MessageBrokerServlet.java
new file mode 100755
index 0000000..aaa60d8
--- /dev/null
+++ b/modules/core/src/flex/messaging/MessageBrokerServlet.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging;
+
+import flex.management.MBeanLifecycleManager;
+import flex.management.MBeanServerLocatorFactory;
+import flex.messaging.config.ConfigurationManager;
+import flex.messaging.config.FlexConfigurationManager;
+import flex.messaging.config.MessagingConfiguration;
+import flex.messaging.endpoints.Endpoint;
+import flex.messaging.io.SerializationContext;
+import flex.messaging.io.TypeMarshallingContext;
+import flex.messaging.log.HTTPRequestLog;
+import flex.messaging.log.Log;
+import flex.messaging.log.LogCategories;
+import flex.messaging.log.Logger;
+import flex.messaging.log.LoggingHttpServletRequestWrapper;
+import flex.messaging.log.ServletLogTarget;
+import flex.messaging.services.AuthenticationService;
+import flex.messaging.util.ClassUtil;
+import flex.messaging.util.ExceptionUtil;
+import flex.messaging.util.Trace;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.Principal;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The MessageBrokerServlet bootstraps the MessageBroker,
+ * adds endpoints to it, and starts the broker. The servlet
+ * also acts as a facade for all http-based endpoints, in that
+ * the servlet receives the http request and then delegates to
+ * an endpoint that can handle the request's content type. This
+ * does not occur for non-http endpoints, such as the rtmp endpoint.
+ *
+ * @author sneville
+ * @see flex.messaging.MessageBroker
+ * @exclude
+ */
+public class MessageBrokerServlet extends HttpServlet
+{
+ static final long serialVersionUID = -5293855229461612246L;
+
+ public static final String LOG_CATEGORY_STARTUP_BROKER = LogCategories.STARTUP_MESSAGEBROKER;
+ private static final String STRING_UNDEFINED_APPLICATION = "undefined";
+
+ private MessageBroker broker;
+ private HttpFlexSessionProvider httpFlexSessionProvider;
+ private static String FLEXDIR = "/WEB-INF/flex/";
+ private boolean log_errors = false;
+
+ /**
+ * Initializes the servlet in its web container, then creates
+ * the MessageBroker and adds Endpoints and Services to that broker.
+ * This servlet may keep a reference to an endpoint if it needs to
+ * delegate to it in the <code>service</code> method.
+ */
+ public void init(ServletConfig servletConfig) throws ServletException
+ {
+ super.init(servletConfig);
+
+ // allocate thread local variables
+ createThreadLocals();
+
+ // Set the servlet config as thread local
+ FlexContext.setThreadLocalObjects(null, null, null, null, null, servletConfig);
+
+ ServletLogTarget.setServletContext(servletConfig.getServletContext());
+
+ ClassLoader loader = getClassLoader();
+
+ if ("true".equals(servletConfig.getInitParameter("useContextClassLoader")))
+ {
+ loader = Thread.currentThread().getContextClassLoader();
+ }
+
+ // Should we wrap http request for later error logging?
+ log_errors = HTTPRequestLog.init(getServletContext());
+
+ // Start the broker
+ try
+ {
+ // Get the configuration manager
+ ConfigurationManager configManager = loadMessagingConfiguration(servletConfig);
+
+ // Load configuration
+ MessagingConfiguration config = configManager.getMessagingConfiguration(servletConfig);
+
+ // Set up logging system ahead of everything else.
+ config.createLogAndTargets();
+
+ // Create broker.
+ broker = config.createBroker(servletConfig.getInitParameter("messageBrokerId"), loader);
+
+ // Set the servlet config as thread local
+ FlexContext.setThreadLocalObjects(null, null, broker, null, null, servletConfig);
+
+ setupPathResolvers();
+
+ // Set initial servlet context on broker
+ broker.setServletContext(servletConfig.getServletContext());
+
+ Logger logger = Log.getLogger(ConfigurationManager.LOG_CATEGORY);
+ if (Log.isInfo())
+ {
+ logger.info(VersionInfo.buildMessage());
+ }
+
+ // Create endpoints, services, security, and logger on the broker based on configuration
+ config.configureBroker(broker);
+
+ long timeBeforeStartup = 0;
+ if (Log.isDebug())
+ {
+ timeBeforeStartup = System.currentTimeMillis();
+ Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is starting.",
+ new Object[]{broker.getId()});
+ }
+
+ //initialize the httpSessionToFlexSessionMap
+ synchronized(HttpFlexSession.mapLock)
+ {
+ if (servletConfig.getServletContext().getAttribute(HttpFlexSession.SESSION_MAP) == null)
+ servletConfig.getServletContext().setAttribute(HttpFlexSession.SESSION_MAP, new ConcurrentHashMap());
+ }
+
+ broker.start();
+
+ if (Log.isDebug())
+ {
+ long timeAfterStartup = System.currentTimeMillis();
+ Long diffMillis = timeAfterStartup - timeBeforeStartup;
+ Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is ready (startup time: '{1}' ms)",
+ new Object[]{broker.getId(), diffMillis});
+ }
+
+ // Report replaced tokens
+ configManager.reportTokens();
+
+ // Report any unused properties.
+ config.reportUnusedProperties();
+
+ // Setup provider for FlexSessions that wrap underlying J2EE HttpSessions.
+ httpFlexSessionProvider = new HttpFlexSessionProvider();
+ broker.getFlexSessionManager().registerFlexSessionProvider(HttpFlexSession.class, httpFlexSessionProvider);
+
+ // clear the broker and servlet config as this thread is done
+ FlexContext.clearThreadLocalObjects();
+ }
+ catch (Throwable t)
+ {
+ // On any unhandled exception destroy the broker, log it and rethrow.
+ String applicationName = servletConfig.getServletContext().getServletContextName();
+ if (applicationName == null)
+ applicationName = STRING_UNDEFINED_APPLICATION;
+
+ System.err.println("**** MessageBrokerServlet in application '" + applicationName
+ + "' failed to initialize due to runtime exception: "
+ + ExceptionUtil.exceptionFollowedByRootCausesToString(t));
+ destroy();
+ // We used to throw UnavailableException, but Weblogic didn't mark the webapp as failed. See bug FBR-237
+ throw new ServletException(t);
+ }
+ }
+
+ private void setupPathResolvers()
+ {
+ setupExternalPathResolver();
+ setupInternalPathResolver();
+ }
+
+ private void setupExternalPathResolver()
+ {
+ broker.setExternalPathResolver(
+ new MessageBroker.PathResolver()
+ {
+ public InputStream resolve(String filename) throws FileNotFoundException
+ {
+ return new FileInputStream(new File(filename));
+ }
+ }
+ );
+ }
+
+ private void setupInternalPathResolver()
+ {
+ broker.setInternalPathResolver(
+ new MessageBroker.InternalPathResolver()
+ {
+ public InputStream resolve(String filename)
+ {
+ return getServletContext().getResourceAsStream(FLEXDIR + filename);
+ }
+ }
+ );
+ }
+
+ private static ConfigurationManager loadMessagingConfiguration(ServletConfig servletConfig)
+ {
+ ConfigurationManager manager = null;
+ Class managerClass;
+ String className;
+
+ // Check for Custom Configuration Manager Specification
+ if (servletConfig != null)
+ {
+ String p = servletConfig.getInitParameter("services.configuration.manager");
+ if (p != null)
+ {
+ className = p.trim();
+ try
+ {
+ managerClass = ClassUtil.createClass(className);
+ manager = (ConfigurationManager)managerClass.newInstance();
+ }
+ catch (Throwable t)
+ {
+ if (Trace.config) // Log is not initialized yet.
+ Trace.trace("Could not load configuration manager as: " + className);
+ }
+ }
+ }
+
+ if (manager == null)
+ {
+ manager = new FlexConfigurationManager();
+ }
+
+ return manager;
+ }
+
+ /**
+ * Stops all endpoints in the MessageBroker, giving them a chance
+ * to perform any endpoint-specific clean up.
+ */
+ public void destroy()
+ {
+ if (broker != null)
+ {
+ broker.stop();
+ if (broker.isManaged())
+ {
+ MBeanLifecycleManager.unregisterRuntimeMBeans(broker);
+ }
+ // release static thread locals
+ destroyThreadLocals();
+ }
+ }
+
+ /**
+ * Handle an incoming request, and delegate to an endpoint based on
+ * content type, if appropriate. The content type mappings for endpoints
+ * are not externally configurable, and currently the AmfEndpoint
+ * is the only delegate.
+ */
+ public void service(HttpServletRequest req, HttpServletResponse res)
+ {
+ if (log_errors)
+ {
+ // Create a wrapper for the request object so we can save the body content
+ LoggingHttpServletRequestWrapper wrapper = new LoggingHttpServletRequestWrapper(req);
+ req = wrapper;
+
+ try
+ {
+ // Read the body content
+ wrapper.doReadBody();
+ }
+ catch (IOException ignore)
+ {
+ // ignore, the wrapper will preserve what content we were able to read.
+ }
+ }
+
+ try
+ {
+ // Update thread locals
+ broker.initThreadLocals();
+ // Set this first so it is in place for the session creation event. The
+ // current session is set by the FlexSession stuff right when it is available.
+ // The threadlocal FlexClient is set up during message deserialization in the
+ // MessageBrokerFilter.
+ FlexContext.setThreadLocalObjects(null, null, broker, req, res, getServletConfig());
+
+ HttpFlexSession fs = httpFlexSessionProvider.getOrCreateSession(req);
+ Principal principal;
+ if(FlexContext.isPerClientAuthentication())
+ {
+ principal = FlexContext.getUserPrincipal();
+ }
+ else
+ {
+ principal = fs.getUserPrincipal();
+ }
+
+ if (principal == null && req.getHeader("Authorization") != null)
+ {
+ String encoded = req.getHeader("Authorization");
+ if (encoded.indexOf("Basic") > -1)
+ {
+ encoded = encoded.substring(6); //Basic.length()+1
+ try
+ {
+ ((AuthenticationService)broker.getService(AuthenticationService.ID)).decodeAndLogin(encoded, broker.getLoginManager());
+ }
+ catch (Exception e)
+ {
+ if (Log.isDebug())
+ Log.getLogger(LogCategories.SECURITY).info("Authentication service could not decode and login: " + e.getMessage());
+ }
+ }
+ }
+
+ String contextPath = req.getContextPath();
+ String pathInfo = req.getPathInfo();
+ String endpointPath = req.getServletPath();
+ if (pathInfo != null)
+ endpointPath = endpointPath + pathInfo;
+
+ Endpoint endpoint;
+ try
+ {
+ endpoint = broker.getEndpoint(endpointPath, contextPath);
+ }
+ catch (MessageException me)
+ {
+ if (Log.isInfo())
+ Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Received invalid request for endpoint path '{0}'.", new Object[] {endpointPath});
+
+ if (!res.isCommitted())
+ {
+ try
+ {
+ res.sendError(HttpServletResponse.SC_NOT_FOUND);
+ }
+ catch (IOException ignore)
+ {}
+ }
+
+ return;
+ }
+
+ try
+ {
+ if (Log.isInfo())
+ {
+ Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request.",
+ new Object[] {endpoint.getId()});
+ }
+ endpoint.service(req, res);
+ }
+ catch (UnsupportedOperationException ue)
+ {
+ if (Log.isInfo())
+ {
+ Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request for an unsupported operation.",
+ new Object[] {endpoint.getId()},
+ ue);
+ }
+
+ if (!res.isCommitted())
+ {
+ try
+ {
+ res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
+ }
+ catch (IOException ignore)
+ {}
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ // Final resort catch block as recommended by Fortify as a potential System info leak
+ try
+ {
+ Log.getLogger(LogCategories.ENDPOINT_GENERAL).error("Unexpected error encountered in Message Broker servlet", t);
+ res.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ catch (IOException ignore)
+ {
+ // ignore
+ }
+
+ }
+ finally
+ {
+ if (log_errors)
+ {
+ String info = (String) req.getAttribute(HTTPRequestLog.HTTP_ERROR_INFO);
+ if (info != null)
+ {
+ // Log the HttpRequest data
+ System.out.println("Exception occurred while processing HTTP request: " + info + ", request details logged in " + HTTPRequestLog.getFileName());
+ HTTPRequestLog.outputRequest(info, req);
+ }
+ }
+
+ FlexContext.clearThreadLocalObjects();
+ }
+ }
+
+ /**
+ * Hook for subclasses to override the class loader to use for loading user defined classes.
+ *
+ * @return the class loader for this class
+ */
+ protected ClassLoader getClassLoader()
+ {
+ return this.getClass().getClassLoader();
+ }
+
+ /** @exclude */
+ // Call ONLY on servlet startup
+ public static void createThreadLocals()
+ {
+ // allocate static thread local objects
+ FlexContext.createThreadLocalObjects();
+ SerializationContext.createThreadLocalObjects();
+ TypeMarshallingContext.createThreadLocalObjects();
+ }
+
+ /** @exclude */
+ // Call ONLY on servlet shutdown
+ protected static void destroyThreadLocals()
+ {
+ // clear static member variables
+ Log.clear();
+ MBeanServerLocatorFactory.clear();
+
+ // Destroy static thread local objects
+ FlexContext.releaseThreadLocalObjects();
+ SerializationContext.releaseThreadLocalObjects();
+ TypeMarshallingContext.releaseThreadLocalObjects();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/MessageClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/MessageClient.java b/modules/core/src/flex/messaging/MessageClient.java
new file mode 100755
index 0000000..e9c8ccc
--- /dev/null
+++ b/modules/core/src/flex/messaging/MessageClient.java
@@ -0,0 +1,1144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import flex.messaging.client.FlexClient;
+import flex.messaging.client.FlexClientOutboundQueueProcessor;
+import flex.messaging.client.OutboundQueueThrottleManager;
+import flex.messaging.config.ThrottleSettings;
+import flex.messaging.config.ThrottleSettings.Policy;
+import flex.messaging.log.LogCategories;
+import flex.messaging.log.Log;
+import flex.messaging.messages.AsyncMessage;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.Message;
+import flex.messaging.services.MessageService;
+import flex.messaging.services.messaging.Subtopic;
+import flex.messaging.services.messaging.selector.JMSSelector;
+import flex.messaging.services.messaging.selector.JMSSelectorException;
+import flex.messaging.util.ExceptionUtil;
+import flex.messaging.util.TimeoutAbstractObject;
+import flex.messaging.util.StringUtils;
+
+/**
+ * Represents a client-side MessageAgent instance.
+ * Currently a server-side MessageClient is only created if its client-side counterpart has subscribed
+ * to a destination for pushed data (e.g. Consumer). Client-side Producers do not result in the creation of
+ * corresponding server-side MessageClient instances.
+ *
+ * Client-side MessageAgents communicate with the server over a Channel that corresponds to a FlexSession.
+ * Server-side MessageClient instances are always created in the context of a FlexSession and when the FlexSession
+ * is invalidated any associated MessageClients are invalidated as well.
+ *
+ * MessageClients may also be timed out on a per-destination basis and this is based on subscription inactivity.
+ * If no messages are pushed to the MessageClient within the destination's subscription timeout period the
+ * MessageClient will be shutdown even if the associated FlexSession is still active and connected.
+ * Per-destination subscription timeout is an optional configuration setting, and should only be used when inactive
+ * subscriptions should be shut down opportunistically to preserve server resources.
+ */
+public class MessageClient extends TimeoutAbstractObject implements Serializable
+{
+ //--------------------------------------------------------------------------
+ //
+ // Public Static Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Log category for MessageClient related messages.
+ */
+ public static final String MESSAGE_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_MESSAGECLIENT;
+
+ //--------------------------------------------------------------------------
+ //
+ // Static Constants
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Serializable to support broadcasting subscription state across the cluster for
+ * optimized message routing.
+ */
+ static final long serialVersionUID = 3730240451524954453L;
+
+ //--------------------------------------------------------------------------
+ //
+ // Static Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * The list of MessageClient created listeners.
+ */
+ private static final CopyOnWriteArrayList<MessageClientListener> createdListeners = new CopyOnWriteArrayList<MessageClientListener>();
+
+ //--------------------------------------------------------------------------
+ //
+ // Static Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Adds a MessageClient created listener.
+ *
+ * @see flex.messaging.MessageClientListener
+ *
+ * @param listener The listener to add.
+ */
+ public static void addMessageClientCreatedListener(MessageClientListener listener)
+ {
+ if (listener != null)
+ createdListeners.addIfAbsent(listener);
+ }
+
+ /**
+ * Removes a MessageClient created listener.
+ *
+ * @see flex.messaging.MessageClientListener
+ *
+ * @param listener The listener to remove.
+ */
+ public static void removeMessageClientCreatedListener(MessageClientListener listener)
+ {
+ if (listener != null)
+ createdListeners.remove(listener);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Private Static Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Utility method.
+ */
+ private static boolean equalStrings(String a, String b)
+ {
+ return a == b || (a != null && a.equals(b));
+ }
+
+ /**
+ * Utility method.
+ */
+ static int compareStrings(String a, String b)
+ {
+ if (a == b)
+ return 0;
+
+ if (a != null && b != null)
+ return a.compareTo(b);
+
+ if (a == null)
+ return -1;
+
+ return 1;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * @exclude
+ * Constructs a new MessageClient for local use.
+ *
+ * @param clientId The clientId for the MessageClient.
+ * @param destination The destination the MessageClient is subscribed to.
+ * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
+ */
+ public MessageClient(Object clientId, Destination destination, String endpointId)
+ {
+ this(clientId, destination, endpointId, true);
+ }
+
+ /**
+ * @exclude
+ * Constructs a new MessageClient.
+ *
+ * @param clientId The clientId for the MessageClient.
+ * @param destination The destination the MessageClient is subscribed to.
+ * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
+ * @param useSession RemoteMessageClient instances should not be associated with a FlexSession (pass false).
+ */
+ public MessageClient(Object clientId, Destination destination, String endpointId, boolean useSession)
+ {
+ valid = true;
+ this.clientId = clientId;
+ this.destination = destination;
+ this.endpointId = endpointId;
+ destinationId = destination.getId();
+ updateLastUse(); // Initialize last use timestamp to construct time.
+
+ /* If this is for a remote server, we do not associate with the session. */
+ if (useSession)
+ {
+ flexSession = FlexContext.getFlexSession();
+ flexSession.registerMessageClient(this);
+
+ flexClient = FlexContext.getFlexClient();
+ flexClient.registerMessageClient(this);
+
+ // SubscriptionManager will notify the created listeners, once
+ // subscription state is setup completely.
+ // notifyCreatedListeners();
+ }
+ else
+ {
+ flexClient = null;
+ flexSession = null;
+ // Use an instance level lock.
+ lock = new Object();
+ // On a remote server we don't notify created listeners.
+ }
+
+ if (Log.isDebug())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient created with clientId '" + this.clientId + "' for destination '" + destinationId + "'.");
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * This flag is set to true when the client channel that this subscription was
+ * established over is disconnected.
+ * It supports cleaning up per-endpoint outbound queues maintained by the FlexClient.
+ * If the client notifies the server that its channel is disconnecting, the FlexClient
+ * does not need to maintain an outbound queue containing a subscription invalidation
+ * message for this MessageClient to send to the client.
+ */
+ private volatile boolean clientChannelDisconnected;
+
+ /**
+ * The clientId for the MessageClient.
+ * This value is specified by the client directly or is autogenerated on the client.
+ */
+ protected final Object clientId;
+
+ /**
+ * Internal reference to the associated Destination; don't expose this in the public API.
+ */
+ protected final Destination destination;
+
+ /**
+ * The destination the MessageClient is subscribed to.
+ */
+ protected final String destinationId;
+
+ /**
+ * The set of session destroy listeners to notify when the session is destroyed.
+ */
+ private transient volatile CopyOnWriteArrayList destroyedListeners;
+
+ /**
+ * The Id for the endpoint this MessageClient subscription was created over.
+ */
+ private String endpointId;
+
+ /**
+ * The FlexClient associated with the MessageClient.
+ */
+ private final transient FlexClient flexClient;
+
+ /**
+ * The FlexSession associated with the MessageClient.
+ * Not final because this needs to be reset if the subscription fails over to a new endpoint.
+ */
+ private transient FlexSession flexSession;
+
+ /**
+ * Flag used to break cycles during invalidation.
+ */
+ private boolean invalidating;
+
+ /**
+ * The lock to use to guard all state changes for the MessageClient.
+ */
+ protected Object lock = new Object();
+
+ /**
+ * Flag indicating whether the MessageClient is attempting to notify the remote client of
+ * its invalidation.
+ */
+ private volatile boolean attemptingInvalidationClientNotification;
+
+ /**
+ * A counter used to control invalidation for a MessageClient that has multiple
+ * subscriptions to its destination.
+ * Unsubscribing from one will not invalidate the MessageClient as long as other
+ * subscriptions remain active.
+ */
+ private transient int numReferences;
+
+ /**
+ * A set of all of the subscriptions managed by this message client.
+ */
+ protected final Set<SubscriptionInfo> subscriptions = new CopyOnWriteArraySet<SubscriptionInfo>();
+
+ /**
+ * Flag indicating whether this client is valid or not.
+ */
+ protected boolean valid;
+
+ /**
+ * Flag that indicates whether the MessageClient has a per-destination subscription timeout.
+ * If false, the MessageClient will remain valid until its associated FlexSession is invalidated.
+ */
+ private volatile boolean willTimeout;
+
+ /**
+ * Has anyone explicitly registered this message client. This indicates that
+ * there is a reference to this MessageClient which is not an explicit subscription.
+ * This is a hook for FDMS and other adapters which want to use pushMessageToClients
+ * with clientIds but that do not want the subscription manager to manage subscriptions
+ * for them.
+ */
+ private volatile boolean registered = false;
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Returns the clientId for the MessageClient.
+ *
+ * @return The clientId for the MessageClient.
+ */
+ public Object getClientId()
+ {
+ return clientId; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the destination the MessageClient is subscribed to.
+ *
+ * @return The destination the MessageClient is subscribed to.
+ */
+ public Destination getDestination()
+ {
+ return destination; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the id of the destination the MessageClient is subscribed to.
+ *
+ * @return The id of the destination the MessageClient is subscribed to.
+ */
+ public String getDestinationId()
+ {
+ return destinationId; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the Id for the endpoint the MessageClient subscription was created over.
+ *
+ * @return The Id for the endpoint the MessageClient subscription was created over.
+ */
+ public String getEndpointId()
+ {
+ return endpointId; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the FlexClient associated with this MessageClient.
+ *
+ * @return The FlexClient assocaited with this MessageClient.
+ */
+ public FlexClient getFlexClient()
+ {
+ return flexClient; // Field is final; no need to sync.
+ }
+
+ /**
+ * Returns the FlexSession associated with this MessageClient.
+ *
+ * @return The FlexSession associated with this MessageClient.
+ */
+ public FlexSession getFlexSession()
+ {
+ synchronized (lock)
+ {
+ return flexSession;
+ }
+ }
+
+ /**
+ * Returns the number of subscriptions associated with this MessageClient.
+ *
+ * @return The number of subscriptions associated with this MessageClient.
+ */
+ public int getSubscriptionCount()
+ {
+ int count;
+
+ synchronized (lock)
+ {
+ count = subscriptions != null? subscriptions.size() : 0;
+ }
+
+ return count;
+ }
+
+ /**
+ * @exclude
+ * This is used for FlexClient outbound queue management. When a MessageClient is invalidated
+ * if it is attempting to notify the client, then we must leave the outbound queue containing
+ * the notification in place. Otherwise, any messages queued for the subscription may be
+ * removed from the queue and possibly shut down immediately.
+ */
+ public boolean isAttemptingInvalidationClientNotification()
+ {
+ return attemptingInvalidationClientNotification;
+ }
+
+ /**
+ * @exclude
+ * This is set to true when the MessageClient is invalidated due to the client
+ * channel the subscription was established over disconnecting.
+ * It allows the FlexClient class to cleanup the outbound queue for the channel's
+ * corresponding server endpoint for the remote client, because we know that no
+ * currently queued messages need to be retained for delivery.
+ */
+ public void setClientChannelDisconnected(boolean value)
+ {
+ clientChannelDisconnected = value;
+ }
+
+ /**
+ * @exclude
+ */
+ public boolean isClientChannelDisconnected()
+ {
+ return clientChannelDisconnected;
+ }
+
+ /**
+ * @exclude
+ * This is true when some code other than the SubscriptionManager
+ * is maintaining subscriptions for this message client. It ensures
+ * that we have this MessageClient kept around until the session
+ * expires.
+ */
+ public void setRegistered(boolean reg)
+ {
+ registered = reg;
+ }
+
+ /**
+ * @exclude
+ */
+ public boolean isRegistered()
+ {
+ return registered;
+ }
+
+ /**
+ * Adds a MessageClient destroy listener.
+ *
+ * @see flex.messaging.MessageClientListener
+ *
+ * @param listener The listener to add.
+ */
+ public void addMessageClientDestroyedListener(MessageClientListener listener)
+ {
+ if (listener != null)
+ {
+ checkValid();
+
+ if (destroyedListeners == null)
+ {
+ synchronized (lock)
+ {
+ if (destroyedListeners == null)
+ destroyedListeners = new CopyOnWriteArrayList();
+ }
+ }
+
+ destroyedListeners.addIfAbsent(listener);
+ }
+ }
+
+ /**
+ * Removes a MessageClient destroyed listener.
+ *
+ * @see flex.messaging.MessageClientListener
+ *
+ * @param listener The listener to remove.
+ */
+ public void removeMessageClientDestroyedListener(MessageClientListener listener)
+ {
+ // No need to check validity; removing a listener is always ok.
+ if (listener != null && destroyedListeners != null)
+ destroyedListeners.remove(listener);
+ }
+
+ /**
+ * @exclude
+ * Adds a subscription to the subscription set for this MessageClient.
+ *
+ * @param selector The selector expression used for the subscription.
+ * @param subtopic The subtopic used for the subscription.
+ * @param maxFrequency The maximum number of messages the client wants to
+ * receive per second (0 disables this limit).
+ */
+ public void addSubscription(String selector, String subtopic, int maxFrequency)
+ {
+ synchronized (lock)
+ {
+ checkValid();
+
+ incrementReferences();
+
+ // Create and add the subscription to the subscriptions set.
+ SubscriptionInfo si = new SubscriptionInfo(selector, subtopic, maxFrequency);
+ subscriptions.add(si);
+
+ registerSubscriptionWithThrottleManager(si);
+ }
+ }
+
+ /**
+ * @exclude
+ * Registers the subscription with the outbound queue processor's throttle
+ * manager, if one exists.
+ *
+ * @param si The subscription info object.
+ */
+ public void registerSubscriptionWithThrottleManager(SubscriptionInfo si)
+ {
+ // Register the destination that will setup client level outbound throttling.
+ ThrottleSettings ts = destination.getNetworkSettings().getThrottleSettings();
+ if (ts.getOutboundPolicy() != Policy.NONE && (ts.isOutboundClientThrottleEnabled() || si.maxFrequency > 0))
+ {
+ // Setup the client level outbound throttling, and register the destination
+ // only if the policy is not NONE, and a throttling limit is specified
+ // either at the destination or by consumer.
+ OutboundQueueThrottleManager throttleManager = getThrottleManager(true);
+ if (throttleManager != null)
+ throttleManager.registerDestination(destinationId, ts.getOutgoingClientFrequency(), ts.getOutboundPolicy());
+ }
+ else if (si.maxFrequency > 0) // Let the client know that maxFrequency will be ignored.
+ {
+ if (Log.isWarn())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).warn("MessageClient with clientId '"
+ + clientId + "' for destination '" + destinationId
+ + "' specified a maxFrequency value of '" + si.maxFrequency
+ + "' but the destination does not define a throttling policy. This value will be ignored.");
+ }
+
+ // Now, register the subscription.
+ OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
+ if (throttleManager != null)
+ throttleManager.registerSubscription(destinationId, si);
+ }
+
+ /**
+ * @exclude
+ * Removes a subscription from the subscription set for this MessageClient.
+ *
+ * @param selector The selector expression for the subscription.
+ * @param subtopic The subtopic for the subscription.
+ * @return true if no subscriptions remain for this MessageClient; otherwise false.
+ */
+ public boolean removeSubscription(String selector, String subtopic)
+ {
+ synchronized (lock)
+ {
+ SubscriptionInfo si = new SubscriptionInfo(selector, subtopic);
+ if (subscriptions.remove(si))
+ {
+ unregisterSubscriptionWithThrottleManager(si);
+ return decrementReferences();
+ }
+ else if (Log.isError())
+ {
+ Log.getLogger(MessageService.LOG_CATEGORY).error("Error - unable to find subscription to remove for MessageClient: "
+ + clientId + " selector: " + selector + " subtopic: " + subtopic);
+ }
+ return numReferences == 0;
+ }
+ }
+
+ /**
+ * @exclude
+ * We use the same MessageClient for more than one subscription with different
+ * selection criteria. This tracks the number of subscriptions that are active
+ * so that we know when we are finished.
+ */
+ public void incrementReferences()
+ {
+ synchronized (lock)
+ {
+ numReferences++;
+ }
+ }
+
+ /**
+ * @exclude
+ * Decrements the numReferences variable and returns true if this was the last reference.
+ */
+ public boolean decrementReferences()
+ {
+ synchronized (lock)
+ {
+ if (--numReferences == 0)
+ {
+ cancelTimeout();
+ if (destination instanceof MessageDestination)
+ {
+ MessageDestination msgDestination = (MessageDestination)destination;
+ if (msgDestination.getThrottleManager() != null)
+ msgDestination.getThrottleManager().removeClientThrottleMark(clientId);
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * @exclude
+ * Invoked by SubscriptionManager once the subscription state is setup completely
+ * for the MessageClient..
+ */
+ public void notifyCreatedListeners()
+ {
+ // Notify MessageClient created listeners.
+ if (!createdListeners.isEmpty())
+ {
+ // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
+ for (Iterator iter = createdListeners.iterator(); iter.hasNext();)
+ ((MessageClientListener)iter.next()).messageClientCreated(this);
+ }
+ }
+
+ /**
+ * @exclude
+ * Invoked by SubscriptionManager while handling a subscribe request.
+ * If the request is updating an existing subscription the 'push' state in the associated FlexClient
+ * may need to be updated to ensure that the correct endpoint is used for this subscription.
+ *
+ * @param newEndpointId The id for the new endpoint that the subscription may have failed over to.
+ */
+ public void resetEndpoint(String newEndpointId)
+ {
+ String oldEndpointId = null;
+ FlexSession oldSession = null;
+ FlexSession newSession = FlexContext.getFlexSession();
+ synchronized (lock)
+ {
+ // If anything is null, or nothing has changed, no need for a reset.
+ if (endpointId == null || newEndpointId == null || flexSession == null || newSession == null || (endpointId.equals(newEndpointId) && flexSession.equals(newSession)))
+ return;
+
+ oldEndpointId = endpointId;
+ endpointId = newEndpointId;
+
+ oldSession = flexSession;
+ flexSession = newSession;
+ }
+
+ // Unregister in order to reset the proper push settings in the re-registration below once the session association has been patched.
+ if (flexClient != null)
+ flexClient.unregisterMessageClient(this);
+
+ // Clear out any reference to this subscription that the previously associated session has.
+ if (oldSession != null)
+ oldSession.unregisterMessageClient(this);
+
+ // Associate the current session with this subscription.
+ if (flexSession != null)
+ flexSession.registerMessageClient(this);
+
+ // Reset proper push settings.
+ if (flexClient != null)
+ flexClient.registerMessageClient(this);
+
+ if (Log.isDebug())
+ {
+ String msg = "MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been reset as a result of a resubscribe.";
+ if (oldEndpointId != null && !oldEndpointId.equals(newEndpointId))
+ msg += " Endpoint change [" + oldEndpointId + " -> " + newEndpointId + "]";
+ if ((oldSession != null) && (newSession != null) && (oldSession != newSession)) // Test identity.
+ msg += " FlexSession change [" + oldSession.getClass().getName() + ":" + oldSession.getId() + " -> " + newSession.getClass().getName() + ":" + newSession.getId() + "]";
+
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug(msg);
+ }
+ }
+
+ /**
+ * @exclude
+ * Used to test whether this client should receive this message
+ * based on the list of subscriptions we have recorded for it.
+ * It must match both the subtopic and the selector expression.
+ * Usually this is done by the subscription manager - this logic is
+ * only here to maintain api compatibility with one of the variants
+ * of the pushMessageToClients which has subscriberIds and an evalSelector
+ * property.
+ *
+ * @param message The message to test.
+ */
+ public boolean testMessage(Message message, MessageDestination destination)
+ {
+ String subtopic = (String) message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);
+ String subtopicSeparator = destination.getServerSettings().getSubtopicSeparator();
+ synchronized (lock)
+ {
+ for (SubscriptionInfo si : subscriptions)
+ {
+ if (si.matches(message, subtopic, subtopicSeparator))
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns true if the MessageClient is valid; false if it has been invalidated.
+ *
+ * @return true if the MessageClient is valid; otherwise false.
+ */
+ public boolean isValid()
+ {
+ synchronized (lock)
+ {
+ return valid;
+ }
+ }
+
+ /**
+ * Invalidates the MessageClient.
+ */
+ public void invalidate()
+ {
+ invalidate(false /* don't attempt to notify the client */);
+ }
+
+ /**
+ * Invalidates the MessageClient, and optionally attempts to notify the client that
+ * this subscription has been invalidated.
+ * This overload is used when a subscription is timed out while the client is still
+ * actively connected to the server but should also be used by any custom code on the server
+ * that invalidates MessageClients but wishes to notify the client cleanly.
+ *
+ * @param notifyClient <code>true</code> to notify the client that its subscription has been
+ * invalidated.
+ */
+ public void invalidate(boolean notifyClient)
+ {
+ synchronized (lock)
+ {
+ if (!valid || invalidating)
+ return; // Already shutting down.
+
+ invalidating = true; // This thread gets to shut the MessageClient down.
+ cancelTimeout();
+ }
+
+ // Record whether we're attempting to notify the client or not.
+ attemptingInvalidationClientNotification = notifyClient;
+
+ // Build a subscription invalidation message and push to the client if it is still valid.
+ if (notifyClient && flexClient != null && flexClient.isValid())
+ {
+ CommandMessage msg = new CommandMessage();
+ msg.setDestination(destination.getId());
+ msg.setClientId(clientId);
+ msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION);
+ Set subscriberIds = new TreeSet();
+ subscriberIds.add(clientId);
+ try
+ {
+ if (destination instanceof MessageDestination)
+ {
+ MessageDestination msgDestination = (MessageDestination)destination;
+ ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, msg, false /* don't eval selector */);
+ }
+ }
+ catch (MessageException ignore) {}
+ }
+
+ // Notify messageClientDestroyed listeners that we're being invalidated.
+ if (destroyedListeners != null && !destroyedListeners.isEmpty())
+ {
+ for (Iterator iter = destroyedListeners.iterator(); iter.hasNext();)
+ {
+ ((MessageClientListener)iter.next()).messageClientDestroyed(this);
+ }
+ destroyedListeners.clear();
+ }
+
+ // And generate unsubscribe messages for all of the MessageClient's subscriptions and
+ // route them to the destination this MessageClient is subscribed to.
+ // The reason we send a message to the service rather than just going straight to the SubscriptionManager
+ // is that some adapters manage their own subscription state (i.e. JMS) in addition to us keeping track of
+ // things with our SubscriptionManager.
+ ArrayList<CommandMessage> unsubMessages = new ArrayList<CommandMessage>();
+ synchronized (lock)
+ {
+ for (SubscriptionInfo subInfo : subscriptions)
+ {
+ CommandMessage unsubMessage = new CommandMessage();
+ unsubMessage.setDestination(destination.getId());
+ unsubMessage.setClientId(clientId);
+ unsubMessage.setOperation(CommandMessage.UNSUBSCRIBE_OPERATION);
+ unsubMessage.setHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER, Boolean.TRUE);
+ unsubMessage.setHeader(CommandMessage.SELECTOR_HEADER, subInfo.selector);
+ unsubMessage.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subInfo.subtopic);
+ unsubMessages.add(unsubMessage);
+ }
+ }
+ // Release the lock and send the unsub messages.
+ for (CommandMessage unsubMessage : unsubMessages)
+ {
+ try
+ {
+ destination.getService().serviceCommand(unsubMessage);
+ }
+ catch (MessageException me)
+ {
+ if (Log.isDebug())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient: " + getClientId() + " issued an unsubscribe message during invalidation that was not processed but will continue with invalidation. Reason: " + ExceptionUtil.toString(me));
+ }
+ }
+
+ synchronized (lock)
+ {
+ // If we didn't clean up all subscriptions log an error and continue with shutdown.
+ int remainingSubscriptionCount = subscriptions.size();
+ if (remainingSubscriptionCount > 0 && Log.isError())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).error("MessageClient: " + getClientId() + " failed to remove " + remainingSubscriptionCount + " subscription(s) during invalidation");
+ }
+
+ // If someone registered this message client, invalidating it will free
+ // their reference which will typically also remove this message client.
+ if (registered && destination instanceof MessageDestination)
+ ((MessageDestination)destination).getSubscriptionManager().releaseMessageClient(this);
+
+ synchronized (lock)
+ {
+ valid = false;
+ invalidating = false;
+ }
+
+ if (Log.isDebug())
+ Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been invalidated.");
+ }
+
+ /**
+ * Pushes the supplied message and then invalidates the MessageClient.
+ *
+ * @param message The message to push to the client before invalidating.
+ * When message is null, MessageClient is invalidated silently.
+ */
+ public void invalidate(Message message)
+ {
+ if (message != null)
+ {
+ message.setDestination(destination.getId());
+ message.setClientId(clientId);
+
+ Set subscriberIds = new TreeSet();
+ subscriberIds.add(clientId);
+ try
+ {
+ if (destination instanceof MessageDestination)
+ {
+ MessageDestination msgDestination = (MessageDestination)destination;
+ ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, message, false /* don't eval selector */);
+ }
+ }
+ catch (MessageException ignore) {}
+
+ invalidate(true /* attempt to notify remote client */);
+ }
+ else
+ {
+ invalidate();
+ }
+ }
+
+ /**
+ * @exclude
+ * Compares this MessageClient to the specified object. The result is true if
+ * the argument is not null and is a MessageClient instance with a matching
+ * clientId value.
+ *
+ * @param o The object to compare this MessageClient to.
+ * @return true if the MessageClient is equal; otherwise false.
+ */
+ public boolean equals(Object o)
+ {
+ if (o instanceof MessageClient)
+ {
+ MessageClient c = (MessageClient) o;
+ if (c != null && c.getClientId().equals(clientId))
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @exclude
+ * Returns the hash code for this MessageClient. The returned value is
+ * the hash code for the MessageClient's clientId property.
+ *
+ * @return The hash code value for this MessageClient.
+ */
+ @Override
+ public int hashCode()
+ {
+ return getClientId().hashCode();
+ }
+
+ /**
+ * @exclude
+ * The String representation of this MessageClient is returned (its clientId value).
+ *
+ * @return The clientId value for this MessageClient.
+ */
+ @Override
+ public String toString()
+ {
+ return String.valueOf(clientId);
+ }
+
+ //----------------------------------
+ // TimeoutAbstractObject overrides
+ //----------------------------------
+
+ /**
+ * @exclude
+ * Implements TimeoutCapable.
+ * This method returns the timeout value configured for the MessageClient's destination.
+ */
+ @Override
+ public long getTimeoutPeriod()
+ {
+ return (destination instanceof MessageDestination) ?
+ ((MessageDestination)destination).getSubscriptionManager().getSubscriptionTimeoutMillis() : 0;
+ }
+
+ /**
+ * @exclude
+ * Implements TimeoutCapable.
+ * This method is invoked when the MessageClient has timed out and it
+ * invalidates the MessageClient.
+ */
+ public void timeout()
+ {
+ invalidate(true /* notify client */);
+ }
+
+ /**
+ * @exclude
+ * Returns true if a timeout task is running for this MessageClient.
+ */
+ public boolean isTimingOut()
+ {
+ return willTimeout;
+ }
+
+ /**
+ * @exclude
+ * Records whether a timeout task is running for this MessageClient.
+ */
+ public void setTimingOut(boolean value)
+ {
+ willTimeout = value;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Private Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Utility method that tests validity and throws an exception if the instance
+ * has been invalidated.
+ */
+ private void checkValid()
+ {
+ synchronized (lock)
+ {
+ if (!valid)
+ {
+ throw new RuntimeException("MessageClient has been invalidated."); // TODO - localize
+ }
+ }
+ }
+
+ private OutboundQueueThrottleManager getThrottleManager(boolean create)
+ {
+ if (flexClient != null)
+ {
+ FlexClientOutboundQueueProcessor processor = flexClient.getOutboundQueueProcessor(endpointId);
+ if (processor != null)
+ return create? processor.getOrCreateOutboundQueueThrottleManager() : processor.getOutboundQueueThrottleManager();
+ }
+ return null;
+ }
+
+ private void unregisterSubscriptionWithThrottleManager(SubscriptionInfo si)
+ {
+ OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
+ if (throttleManager != null)
+ throttleManager.unregisterSubscription(destinationId, si);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Nested Classes
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Represents a MessageClient's subscription to a destination.
+ * It captures the optional selector expression and subtopic for the
+ * subscription.
+ */
+ public static class SubscriptionInfo implements Comparable
+ {
+ public String selector, subtopic;
+ public int maxFrequency; // maxFrequency per subscription. Not used in BlazeDS.
+
+ public SubscriptionInfo(String sel, String sub)
+ {
+ this(sel, sub, 0);
+ }
+
+ public SubscriptionInfo(String sel, String sub, int maxFrequency)
+ {
+ this.selector = sel;
+ this.subtopic = sub;
+ this.maxFrequency = maxFrequency;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o instanceof SubscriptionInfo)
+ {
+ SubscriptionInfo other = (SubscriptionInfo) o;
+ return equalStrings(other.selector, selector) &&
+ equalStrings(other.subtopic, subtopic);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return (selector == null ? 0 : selector.hashCode()) +
+ (subtopic == null ? 1 : subtopic.hashCode());
+ }
+
+ /**
+ * Compares the two subscription infos (being careful to
+ * ensure we compare in a consistent way if the arguments
+ * are switched).
+ * @param o the object to compare
+ * @return int the compare result
+ */
+ public int compareTo(Object o)
+ {
+ SubscriptionInfo other = (SubscriptionInfo) o;
+ int result;
+
+ if ((result = compareStrings(other.selector, selector)) != 0)
+ return result;
+ else if ((result = compareStrings(other.subtopic, subtopic)) != 0)
+ return result;
+
+ return 0;
+ }
+
+ /**
+ * Check whether the message matches with selected subtopic.
+ * @param message current message
+ * @param subtopicToMatch subtopc string
+ * @param subtopicSeparator suptopic separator
+ * @return true if the message matches the subtopic
+ */
+ public boolean matches(Message message, String subtopicToMatch, String subtopicSeparator)
+ {
+ if ((subtopicToMatch == null && subtopic != null) || (subtopicToMatch != null && subtopic == null))
+ return false; // If either defines a subtopic, they both must define one.
+
+ // If both define a subtopic, they must match.
+ if (subtopicToMatch != null && subtopic != null)
+ {
+ Subtopic consumerSubtopic = new Subtopic(subtopic, subtopicSeparator);
+ Subtopic messageSubtopic = new Subtopic(subtopicToMatch, subtopicSeparator);
+ if (!consumerSubtopic.matches(messageSubtopic))
+ return false; // Not a match.
+ }
+
+ if (selector == null)
+ return true;
+
+ JMSSelector jmsSelector = new JMSSelector(selector);
+ try
+ {
+ if (jmsSelector.match(message))
+ return true;
+ }
+ catch (JMSSelectorException jmse)
+ {
+ // Log a warning for this client's selector and continue
+ if (Log.isWarn())
+ {
+ Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error processing message selector: " +
+ jmse.toString() + StringUtils.NEWLINE +
+ " incomingMessage: " + message + StringUtils.NEWLINE +
+ " selector: " + selector + StringUtils.NEWLINE);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns a String representation of the subscription info.
+ * @return String the string representation of the subscription info
+ */
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Subtopic: " + subtopic + StringUtils.NEWLINE);
+ sb.append("Selector: " + selector + StringUtils.NEWLINE);
+ if (maxFrequency > 0)
+ sb.append("maxFrequency: " + maxFrequency);
+ return sb.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/MessageClientListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/MessageClientListener.java b/modules/core/src/flex/messaging/MessageClientListener.java
new file mode 100755
index 0000000..e77b31d
--- /dev/null
+++ b/modules/core/src/flex/messaging/MessageClientListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging;
+
+/**
+ * Interface to be notified when a MessageClient is created or destroyed. Implementations of this interface
+ * may add themselves as listeners statically via <code>MessageClient.addMessageClientCreatedListener()</code>.
+ * To listen for MessageClient destruction, the implementation class instance must add itself as a listener to
+ * a specific MessageClient instance via the <code>addMessageClientDestroyedListener()</code> method.
+ */
+public interface MessageClientListener
+{
+ /**
+ * Notification that a MessageClient was created.
+ *
+ * @param messageClient The MessageClient that was created.
+ */
+ void messageClientCreated(MessageClient messageClient);
+
+ /**
+ * Notification that a MessageClient is about to be destroyed.
+ *
+ * @param messageClient The MessageClient that will be destroyed.
+ */
+ void messageClientDestroyed(MessageClient messageClient);
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/7a58369c/modules/core/src/flex/messaging/MessageDestination.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/MessageDestination.java b/modules/core/src/flex/messaging/MessageDestination.java
new file mode 100755
index 0000000..5552b7a
--- /dev/null
+++ b/modules/core/src/flex/messaging/MessageDestination.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging;
+
+import flex.management.runtime.messaging.MessageDestinationControl;
+import flex.management.runtime.messaging.services.messaging.SubscriptionManagerControl;
+import flex.management.runtime.messaging.services.messaging.ThrottleManagerControl;
+import flex.messaging.config.ConfigurationConstants;
+import flex.messaging.config.ConfigurationException;
+import flex.messaging.config.DestinationSettings;
+import flex.messaging.config.ThrottleSettings;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.config.NetworkSettings;
+import flex.messaging.config.ServerSettings;
+import flex.messaging.config.ThrottleSettings.Policy;
+import flex.messaging.log.LogCategories;
+import flex.messaging.services.MessageService;
+import flex.messaging.services.Service;
+import flex.messaging.services.messaging.SubscriptionManager;
+import flex.messaging.services.messaging.RemoteSubscriptionManager;
+import flex.messaging.services.messaging.ThrottleManager;
+import flex.messaging.services.messaging.MessagingConstants;
+import flex.messaging.util.ClassUtil;
+
+/**
+ * A logical reference to a MessageDestination.
+ */
+public class MessageDestination extends FactoryDestination
+{
+ static final long serialVersionUID = -2016911808141319012L;
+
+ /** Log category for <code>MessageDestination</code>.*/
+ public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE;
+
+ // Errors
+ private static final int UNSUPPORTED_POLICY = 10124;
+
+ // Destination properties
+ private transient ServerSettings serverSettings;
+
+ // Destination internal
+ private transient SubscriptionManager subscriptionManager;
+ private transient RemoteSubscriptionManager remoteSubscriptionManager;
+ private transient ThrottleManager throttleManager;
+
+ private transient MessageDestinationControl controller;
+
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Constructs an unmanaged <code>MessageDestination</code> instance.
+ */
+ public MessageDestination()
+ {
+ this(false);
+ }
+
+ /**
+ * Constructs a <code>MessageDestination</code> with the indicated management.
+ *
+ * @param enableManagement <code>true</code> if the <code>MessageDestination</code>
+ * is manageable; otherwise <code>false</code>.
+ */
+ public MessageDestination(boolean enableManagement)
+ {
+ super(enableManagement);
+
+ serverSettings = new ServerSettings();
+
+ // Managers
+ subscriptionManager = new SubscriptionManager(this);
+ remoteSubscriptionManager = new RemoteSubscriptionManager(this);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Initialize, validate, start, and stop methods.
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Initializes the <code>MessageDestination</code> with the properties.
+ * If subclasses override, they must call <code>super.initialize()</code>.
+ *
+ * @param id The id of the destination.
+ * @param properties Properties for the <code>MessageDestination</code>.
+ */
+ @Override
+ public void initialize(String id, ConfigMap properties)
+ {
+ super.initialize(id, properties);
+
+ if (properties == null || properties.size() == 0)
+ return;
+
+ // Network properties
+ network(properties);
+
+ // Server properties
+ server(properties);
+ }
+
+ /**
+ * Sets up the throttle manager before it starts.
+ */
+ @Override
+ public void start()
+ {
+ // Create the throttle manager, only if needed.
+ if (networkSettings.getThrottleSettings() != null)
+ {
+ ThrottleSettings settings = networkSettings.getThrottleSettings();
+ if (settings.isClientThrottleEnabled() || settings.isDestinationThrottleEnabled())
+ {
+ settings.setDestinationName(getId());
+ throttleManager = createThrottleManager();
+ throttleManager.setThrottleSettings(settings);
+ throttleManager.start();
+ }
+ }
+ super.start();
+ }
+
+ /**
+ * Stops the subscription, remote subscription, and throttle managers and
+ * then calls super class's stop.
+ */
+ @Override
+ public void stop()
+ {
+ if (isStarted())
+ {
+ subscriptionManager.stop();
+ remoteSubscriptionManager.stop();
+ if (throttleManager != null)
+ throttleManager.stop();
+ }
+ super.stop();
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Getters and Setters for Destination properties
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Sets the <code>NetworkSettings</code> of the <code>MessageDestination</code>.
+ *
+ * @param networkSettings The <code>NetworkSettings</code> of the <code>MessageDestination</code>
+ */
+ @Override
+ public void setNetworkSettings(NetworkSettings networkSettings)
+ {
+ super.setNetworkSettings(networkSettings);
+
+ // Set the subscription manager settings if needed.
+ if (networkSettings.getSubscriptionTimeoutMinutes() > 0)
+ {
+ long subscriptionTimeoutMillis = networkSettings.getSubscriptionTimeoutMinutes() * 60L * 1000L; // Convert to millis.
+ subscriptionManager.setSubscriptionTimeoutMillis(subscriptionTimeoutMillis);
+ }
+ }
+
+ /**
+ * Returns the <code>ServerSettings</code> of the <code>MessageDestination</code>.
+ *
+ * @return The <code>ServerSettings</code> of the <code>MessageDestination</code>.
+ */
+ public ServerSettings getServerSettings()
+ {
+ return serverSettings;
+ }
+
+ /**
+ * Sets the <code>ServerSettings</code> of the <code>MessageDestination</code>.
+ *
+ * @param serverSettings The <code>ServerSettings</code> of the <code>MessageDestination</code>
+ */
+ public void setServerSettings(ServerSettings serverSettings)
+ {
+ this.serverSettings = serverSettings;
+ }
+
+ /**
+ * Casts the <code>Service</code> into <code>MessageService</code>
+ * and calls super.setService.
+ *
+ * @param service The <code>Service</code> managing this <code>Destination</code>.
+ */
+ @Override
+ public void setService(Service service)
+ {
+ MessageService messageService = (MessageService)service;
+ super.setService(messageService);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Other Public Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * @exclude
+ * Returns a <tt>ConfigMap</tt> of destination properties that the client
+ * needs. This includes properties from <code>super{@link #describeDestination(boolean)}</code>
+ * and it also includes outbound throttling policy that the edge server might need.
+ *
+ * @param onlyReliable Determines whether only reliable destination configuration should be returned.
+ * @return A <tt>ConfigMap</tt> of destination properties that the client needs.
+ */
+ @Override
+ public ConfigMap describeDestination(boolean onlyReliable)
+ {
+ ConfigMap destinationConfig = super.describeDestination(onlyReliable);
+ if (destinationConfig == null)
+ return null;
+
+ if (throttleManager == null)
+ return destinationConfig;
+
+ Policy outboundPolicy = throttleManager.getOutboundPolicy();
+ if (outboundPolicy == null || outboundPolicy == Policy.NONE)
+ return destinationConfig;
+
+ // Add the outbound throttle policy to network properties section as appropriate.
+ ConfigMap properties = destinationConfig.getPropertyAsMap(ConfigurationConstants.PROPERTIES_ELEMENT, null);
+ if (properties == null)
+ {
+ properties = new ConfigMap();
+ destinationConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties);
+ }
+
+ ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null);
+ if (network == null)
+ {
+ network = new ConfigMap();
+ properties.addProperty(NetworkSettings.NETWORK_ELEMENT, network);
+ }
+
+ ConfigMap throttleOutbound = new ConfigMap();
+ throttleOutbound.addProperty(ThrottleSettings.ELEMENT_POLICY, throttleManager.getOutboundPolicy().toString());
+ network.addProperty(ThrottleSettings.ELEMENT_OUTBOUND, throttleOutbound);
+
+ return destinationConfig;
+ }
+
+ /** @exclude */
+ public SubscriptionManager getSubscriptionManager()
+ {
+ return subscriptionManager;
+ }
+
+ /** @exclude */
+ public RemoteSubscriptionManager getRemoteSubscriptionManager()
+ {
+ return remoteSubscriptionManager;
+ }
+
+ /** @exclude */
+ public ThrottleManager getThrottleManager()
+ {
+ return throttleManager;
+ }
+
+ /** @exclude **/
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o instanceof Destination)
+ {
+ Destination d = (Destination)o;
+ String serviceType1 = d.getServiceType();
+ String serviceType2 = getServiceType();
+ if ((serviceType1 == null && serviceType2 == null) || (serviceType1 != null && serviceType1.equals(serviceType2)))
+ {
+ String id1 = d.getId();
+ String id2 = getId();
+ if ((id1 == null && id2 == null) || (id1 != null && id1.equals(id2)))
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** @exclude **/
+ @Override
+ public int hashCode()
+ {
+ return (getServiceType() == null ? 0 : getServiceType().hashCode()) * 100003 +
+ (getId() == null ? 0 : getId().hashCode());
+ }
+
+ /** @exclude **/
+ @Override
+ public String toString()
+ {
+ return getServiceType() + "#" + getId();
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Protected/Private Methods
+ //
+ //--------------------------------------------------------------------------
+
+ protected ThrottleManager createThrottleManager()
+ {
+ Service service = getService();
+ if (service == null || service.getMessageBroker() == null)
+ return new ThrottleManager(); // Return the default.
+
+ try
+ {
+ Class<? extends ThrottleManager> throttleManagerClass = service.getMessageBroker().getThrottleManagerClass();
+ Object instance = ClassUtil.createDefaultInstance(throttleManagerClass, null);
+ if (instance instanceof ThrottleManager)
+ return (ThrottleManager)instance;
+ }
+ catch (Throwable t)
+ {
+ // NOWARN
+ }
+
+ return new ThrottleManager(); // Return the default.
+ }
+
+ protected void network(ConfigMap properties)
+ {
+ ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null);
+ if (network == null)
+ return;
+
+ // Get implementation specific network settings, including subclasses!
+ NetworkSettings ns = getNetworkSettings();
+
+ // Subscriber timeout; first check for subscription-timeout-minutes and fallback to legacy session-timeout.
+ int useLegacyPropertyToken = -999999;
+ int subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SUBSCRIPTION_TIMEOUT_MINUTES, useLegacyPropertyToken);
+ if (subscriptionTimeoutMinutes == useLegacyPropertyToken)
+ subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SESSION_TIMEOUT, NetworkSettings.DEFAULT_TIMEOUT);
+ ns.setSubscriptionTimeoutMinutes(subscriptionTimeoutMinutes);
+
+ // Throttle Settings
+ ThrottleSettings ts = ns.getThrottleSettings();
+ ts.setDestinationName(getId());
+ throttle(ts, network);
+
+ setNetworkSettings(ns);
+ }
+
+ protected void throttle(ThrottleSettings ts, ConfigMap network)
+ {
+ ConfigMap inbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_INBOUND, null);
+ if (inbound != null)
+ {
+ ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(inbound);
+ ts.setInboundPolicy(policy);
+ int destFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0);
+ ts.setIncomingDestinationFrequency(destFreq);
+ int clientFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0);
+ ts.setIncomingClientFrequency(clientFreq);
+ }
+
+ ConfigMap outbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_OUTBOUND, null);
+ if (outbound != null)
+ {
+ ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(outbound);
+ ts.setOutboundPolicy(policy);
+ int destFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0);
+ ts.setOutgoingDestinationFrequency(destFreq);
+ int clientFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0);
+ ts.setOutgoingClientFrequency(clientFreq);
+ }
+ }
+
+ private ThrottleSettings.Policy getPolicyFromThrottleSettings(ConfigMap settings)
+ {
+ String policyString = settings.getPropertyAsString(ThrottleSettings.ELEMENT_POLICY, null);
+ ThrottleSettings.Policy policy = ThrottleSettings.Policy.NONE;
+ if (policyString == null)
+ return policy;
+ try
+ {
+ policy = ThrottleSettings.parsePolicy(policyString);
+ }
+ catch (ConfigurationException exception)
+ {
+ ConfigurationException ce = new ConfigurationException();
+ ce.setMessage(UNSUPPORTED_POLICY, new Object[] {getId(), policyString});
+ throw ce;
+ }
+ return policy;
+ }
+
+ protected void server(ConfigMap properties)
+ {
+ ConfigMap server = properties.getPropertyAsMap(DestinationSettings.SERVER_ELEMENT, null);
+ if (server == null)
+ return;
+
+ long ttl = server.getPropertyAsLong(MessagingConstants.TIME_TO_LIVE_ELEMENT, -1);
+ serverSettings.setMessageTTL(ttl);
+
+ boolean durable = server.getPropertyAsBoolean(MessagingConstants.IS_DURABLE_ELEMENT, false);
+ serverSettings.setDurable(durable);
+
+ boolean allowSubtopics = server.getPropertyAsBoolean(MessagingConstants.ALLOW_SUBTOPICS_ELEMENT, false);
+ serverSettings.setAllowSubtopics(allowSubtopics);
+
+ boolean disallowWildcardSubtopics = server.getPropertyAsBoolean(MessagingConstants.DISALLOW_WILDCARD_SUBTOPICS_ELEMENT, false);
+ serverSettings.setDisallowWildcardSubtopics(disallowWildcardSubtopics);
+
+ int priority = server.getPropertyAsInt(MessagingConstants.MESSAGE_PRIORITY, -1);
+ if (priority != -1)
+ serverSettings.setPriority(priority);
+
+ String subtopicSeparator = server.getPropertyAsString(MessagingConstants.SUBTOPIC_SEPARATOR_ELEMENT, MessagingConstants.DEFAULT_SUBTOPIC_SEPARATOR);
+ serverSettings.setSubtopicSeparator(subtopicSeparator);
+
+ String routingMode = server.getPropertyAsString(MessagingConstants.CLUSTER_MESSAGE_ROUTING, "server-to-server");
+ serverSettings.setBroadcastRoutingMode(routingMode);
+ }
+
+ /**
+ * Returns the log category of the <code>MessageDestination</code>.
+ *
+ * @return The log category of the component.
+ */
+ @Override
+ protected String getLogCategory()
+ {
+ return LOG_CATEGORY;
+ }
+
+ /**
+ * Invoked automatically to allow the <code>MessageDestination</code> to setup its corresponding
+ * MBean control.
+ *
+ * @param service The <code>Service</code> that manages this <code>MessageDestination</code>.
+ */
+ @Override
+ protected void setupDestinationControl(Service service)
+ {
+ controller = new MessageDestinationControl(this, service.getControl());
+ controller.register();
+ setControl(controller);
+ setupThrottleManagerControl(controller);
+ setupSubscriptionManagerControl(controller);
+ }
+
+ protected void setupThrottleManagerControl(MessageDestinationControl destinationControl)
+ {
+ if (throttleManager != null)
+ {
+ ThrottleManagerControl throttleManagerControl = new ThrottleManagerControl(throttleManager, destinationControl);
+ throttleManagerControl.register();
+ throttleManager.setControl(throttleManagerControl);
+ throttleManager.setManaged(true);
+ destinationControl.setThrottleManager(throttleManagerControl.getObjectName());
+ }
+ }
+
+ private void setupSubscriptionManagerControl(MessageDestinationControl destinationControl)
+ {
+ SubscriptionManagerControl subscriptionManagerControl = new SubscriptionManagerControl(getSubscriptionManager(), destinationControl);
+ subscriptionManagerControl.register();
+ getSubscriptionManager().setControl(subscriptionManagerControl);
+ getSubscriptionManager().setManaged(true);
+ destinationControl.setSubscriptionManager(subscriptionManagerControl.getObjectName());
+ }
+}