You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by cd...@apache.org on 2015/12/20 14:13:42 UTC
[02/51] [partial] flex-blazeds git commit: Removed legacy directories
and made the content of the modules directory the new root - Please use the
maven build for now as the Ant build will no longer work untill it is
adjusted to the new directory structur
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/endpoints/AbstractEndpoint.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/endpoints/AbstractEndpoint.java b/core/src/flex/messaging/endpoints/AbstractEndpoint.java
new file mode 100644
index 0000000..c265ea9
--- /dev/null
+++ b/core/src/flex/messaging/endpoints/AbstractEndpoint.java
@@ -0,0 +1,1513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import flex.management.ManageableComponent;
+import flex.management.runtime.messaging.MessageBrokerControl;
+import flex.management.runtime.messaging.endpoints.EndpointControl;
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.MessageBroker;
+import flex.messaging.MessageException;
+import flex.messaging.Server;
+import flex.messaging.client.FlexClient;
+import flex.messaging.client.FlexClientOutboundQueueProcessor;
+import flex.messaging.client.FlushResult;
+import flex.messaging.client.PollFlushResult;
+import flex.messaging.client.UserAgentSettings;
+import flex.messaging.config.ChannelSettings;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.config.ConfigurationConstants;
+import flex.messaging.config.ConfigurationException;
+import flex.messaging.config.SecurityConstraint;
+import flex.messaging.io.ClassAliasRegistry;
+import flex.messaging.io.SerializationContext;
+import flex.messaging.io.TypeMarshaller;
+import flex.messaging.io.TypeMarshallingContext;
+import flex.messaging.io.amf.translator.ASTranslator;
+import flex.messaging.log.Log;
+import flex.messaging.log.LogCategories;
+import flex.messaging.log.Logger;
+import flex.messaging.messages.AcknowledgeMessage;
+import flex.messaging.messages.AcknowledgeMessageExt;
+import flex.messaging.messages.AsyncMessage;
+import flex.messaging.messages.AsyncMessageExt;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.CommandMessageExt;
+import flex.messaging.messages.Message;
+import flex.messaging.messages.SmallMessage;
+import flex.messaging.security.SecurityException;
+import flex.messaging.util.ClassUtil;
+import flex.messaging.util.StringUtils;
+import flex.messaging.util.UserAgentManager;
+import flex.messaging.validators.DeserializationValidator;
+
+/**
+ * This is the default implementation of Endpoint, which provides a convenient
+ * base for behavior and associations common to all endpoints.
+ *
+ * These properties that appear in the endpoint configuration are only used by the
+ * client, therefore they have to be set on the appropriate client classes: connect-timeout-seconds set on Channel.
+ *
+ * @see flex.messaging.endpoints.Endpoint
+ */
+public abstract class AbstractEndpoint extends ManageableComponent
+ implements Endpoint2, ConfigurationConstants
+{
+ /** Log category for <code>AbstractEndpoint</code>. */
+ public static final String LOG_CATEGORY = LogCategories.ENDPOINT_GENERAL;
+
+ /**
+ * HTTP header field names.
+ */
+ public static final String HEADER_NAME_CACHE_CONTROL = "Cache-Control";
+ public static final String HEADER_NAME_EXPIRES = "Expires";
+ public static final String HEADER_NAME_PRAGMA = "Pragma";
+
+ // Errors
+ private static final int NONSECURE_PROTOCOL = 10066;
+ private static final int REQUIRES_FLEXCLIENT_SUPPORT = 10030;
+ private static final int ERR_MSG_INVALID_URL_SCHEME = 11100;
+
+ // XML Configuration Properties
+ private static final String SERIALIZATION = "serialization";
+ private static final String CREATE_ASOBJECT_FOR_MISSING_TYPE = "create-asobject-for-missing-type";
+ private static final String CUSTOM_DESERIALIZER = "custom-deserializer";
+ private static final String CUSTOM_SERIALIZER = "custom-serializer";
+ private static final String ENABLE_SMALL_MESSAGES = "enable-small-messages";
+ private static final String TYPE_MARSHALLER = "type-marshaller";
+ private static final String RESTORE_REFERENCES = "restore-references";
+ private static final String INSTANTIATE_TYPES = "instantiate-types";
+ private static final String SUPPORT_REMOTE_CLASS = "support-remote-class";
+ private static final String LEGACY_COLLECTION = "legacy-collection";
+ private static final String LEGACY_DICTIONARY = "legacy-dictionary";
+ private static final String LEGACY_MAP = "legacy-map";
+ private static final String LEGACY_XML = "legacy-xml";
+ private static final String LEGACY_XML_NAMESPACES = "legacy-xml-namespaces";
+ private static final String LEGACY_THROWABLE = "legacy-throwable";
+ private static final String LEGACY_BIG_NUMBERS = "legacy-big-numbers";
+ private static final String LEGACY_EXTERNALIZABLE = "legacy-externalizable";
+ private static final String ALLOW_XML_DOCTYPE_DECLARATION = "allow-xml-doctype-declaration";
+ private static final String ALLOW_XML_EXTERNAL_ENTITY_EXPANSION = "allow-xml-external-entity-expansion";
+
+ private static final String LOG_PROPERTY_ERRORS = "log-property-errors";
+ private static final String IGNORE_PROPERTY_ERRORS = "ignore-property-errors";
+ private static final String INCLUDE_READ_ONLY = "include-read-only";
+ private static final String GLOBAL_INCLUDE_READ_ONLY = "global-include-read-only";
+ private static final String FLEX_CLIENT_OUTBOUND_QUEUE_PROCESSOR = "flex-client-outbound-queue-processor";
+ private static final String SHOW_STACKTRACES = "show-stacktraces";
+ private static final String MAX_OBJECT_NEST_LEVEL = "max-object-nest-level";
+ private static final String MAX_COLLECTION_NEST_LEVEL = "max-collection-nest-level";
+ private static final String PREFER_VECTORS = "prefer-vectors";
+
+ // Endpoint properties
+ protected Set<String> clientLoadBalancingUrls;
+ protected String clientType;
+ protected int connectTimeoutSeconds;
+ protected int requestTimeoutSeconds;
+ protected FlexClientOutboundQueueProcessor flexClientOutboundQueueProcessor;
+ protected SerializationContext serializationContext;
+ protected Class<?> deserializerClass;
+ protected Class<?> serializerClass;
+ protected TypeMarshaller typeMarshaller;
+ protected int port;
+ private SecurityConstraint securityConstraint;
+ protected String url;
+ protected boolean recordMessageSizes;
+ protected boolean recordMessageTimes;
+ protected boolean remote;
+ protected Server server;
+ protected boolean serverOnly;
+
+ // Endpoint internal
+ protected String parsedUrl;
+ // Keeps track of what context path parsedUrl has been parsed for. If it is
+ // null, means parsedUrl has not been parsed already.
+ protected String parsedForContext;
+ protected boolean clientContextParsed;
+ protected String parsedClientUrl;
+ protected Logger log;
+
+ protected Class<?> flexClientOutboundQueueProcessClass;
+ protected ConfigMap flexClientOutboundQueueProcessorConfig;
+
+ // Supported messaging version
+ protected double messagingVersion = 1.0;
+
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Constructs an unmanaged <code>AbstractEndpoint</code>.
+ */
+ public AbstractEndpoint()
+ {
+ this(false);
+ }
+
+ /**
+ * Constructs an <code>AbstractEndpoint</code> with the indicated management.
+ *
+ * @param enableManagement <code>true</code> if the <code>AbstractEndpoint</code>
+ * is manageable; <code>false</code> otherwise.
+ */
+ public AbstractEndpoint(boolean enableManagement)
+ {
+ super(enableManagement);
+ this.log = Log.getLogger(getLogCategory());
+ serializationContext = new SerializationContext();
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Initialize, validate, start, and stop methods.
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Initializes the <code>Endpoint</code> with the properties.
+ * If subclasses override this method, they must call <code>super.initialize()</code>.
+ *
+ * @param id The ID of the <code>Endpoint</code>.
+ * @param properties Properties for the <code>Endpoint</code>.
+ */
+ @Override
+ public void initialize(String id, ConfigMap properties)
+ {
+ super.initialize(id, properties);
+
+ if (properties == null || properties.size() == 0)
+ return;
+
+ // Client-targeted <client-load-balancing>
+ initializeClientLoadBalancing(id, properties);
+
+ // Client-targeted <connect-timeout-seconds/>
+ connectTimeoutSeconds = properties.getPropertyAsInt(CONNECT_TIMEOUT_SECONDS_ELEMENT, 0);
+
+ // Client-targeted <request-timeout-seconds/>
+ requestTimeoutSeconds = properties.getPropertyAsInt(REQUEST_TIMEOUT_SECONDS_ELEMENT, 0);
+
+ // Check for a custom FlexClient outbound queue processor.
+ ConfigMap outboundQueueConfig = properties.getPropertyAsMap(FLEX_CLIENT_OUTBOUND_QUEUE_PROCESSOR, null);
+ if (outboundQueueConfig != null)
+ {
+ // Get nested props for the processor.
+ flexClientOutboundQueueProcessorConfig = outboundQueueConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
+
+ String pClassName = outboundQueueConfig.getPropertyAsString(CLASS_ATTR, null);
+ if (pClassName != null)
+ {
+ try
+ {
+ flexClientOutboundQueueProcessClass = createClass(pClassName);
+ // And now create an instance and initialize to make sure the properties are valid.
+ setFlexClientOutboundQueueProcessorConfig(flexClientOutboundQueueProcessorConfig);
+ }
+ catch (Throwable t)
+ {
+ if (Log.isWarn())
+ log.warn("Cannot register custom FlexClient outbound queue processor class {0}", new Object[]{pClassName}, t);
+ }
+ }
+ }
+
+ ConfigMap serialization = properties.getPropertyAsMap(SERIALIZATION, null);
+ if (serialization != null)
+ {
+ // Custom deserializers
+ List<?> deserializers = serialization.getPropertyAsList(CUSTOM_DESERIALIZER, null);
+ if (deserializers != null && Log.isWarn())
+ log.warn("Endpoint <custom-deserializer> functionality is no longer available. Please remove this entry from your configuration.");
+
+ // Custom serializers
+ List<?> serializers = serialization.getPropertyAsList(CUSTOM_SERIALIZER, null);
+ if (serializers != null && Log.isWarn())
+ log.warn("Endpoint <custom-serializer> functionality is no longer available. Please remove this entry from your configuration.");
+
+ // Type Marshaller implementation
+ String typeMarshallerClassName = serialization.getPropertyAsString(TYPE_MARSHALLER, null);
+ if (typeMarshallerClassName != null && typeMarshallerClassName.length() > 0)
+ {
+ try
+ {
+ Class<?> tmc = createClass(typeMarshallerClassName);
+ typeMarshaller = (TypeMarshaller)ClassUtil.createDefaultInstance(tmc, TypeMarshaller.class);
+ }
+ catch (Throwable t)
+ {
+ if (Log.isWarn())
+ log.warn("Cannot register custom type marshaller for type {0}", new Object[]{typeMarshallerClassName}, t);
+ }
+ }
+
+ // Boolean Serialization Flags
+ serializationContext.createASObjectForMissingType = serialization.getPropertyAsBoolean(CREATE_ASOBJECT_FOR_MISSING_TYPE, false);
+ serializationContext.enableSmallMessages = serialization.getPropertyAsBoolean(ENABLE_SMALL_MESSAGES, true);
+ serializationContext.instantiateTypes = serialization.getPropertyAsBoolean(INSTANTIATE_TYPES, true);
+ serializationContext.supportRemoteClass = serialization.getPropertyAsBoolean(SUPPORT_REMOTE_CLASS, false);
+ serializationContext.legacyCollection = serialization.getPropertyAsBoolean(LEGACY_COLLECTION, false);
+ serializationContext.legacyDictionary = serialization.getPropertyAsBoolean(LEGACY_DICTIONARY, false);
+ serializationContext.legacyMap = serialization.getPropertyAsBoolean(LEGACY_MAP, false);
+ serializationContext.legacyXMLDocument = serialization.getPropertyAsBoolean(LEGACY_XML, false);
+ serializationContext.legacyXMLNamespaces = serialization.getPropertyAsBoolean(LEGACY_XML_NAMESPACES, false);
+ serializationContext.legacyThrowable = serialization.getPropertyAsBoolean(LEGACY_THROWABLE, false);
+ serializationContext.legacyBigNumbers = serialization.getPropertyAsBoolean(LEGACY_BIG_NUMBERS, false);
+ serializationContext.legacyExternalizable = serialization.getPropertyAsBoolean(LEGACY_EXTERNALIZABLE, false);
+ serializationContext.allowXmlDoctypeDeclaration = serialization.getPropertyAsBoolean(ALLOW_XML_DOCTYPE_DECLARATION, false);
+ serializationContext.allowXmlExternalEntityExpansion = serialization.getPropertyAsBoolean(ALLOW_XML_EXTERNAL_ENTITY_EXPANSION, false);
+ serializationContext.maxObjectNestLevel = (int)serialization.getPropertyAsLong(MAX_OBJECT_NEST_LEVEL, 512);
+ serializationContext.maxCollectionNestLevel = (int)serialization.getPropertyAsLong(MAX_COLLECTION_NEST_LEVEL, 15);
+ serializationContext.preferVectors = serialization.getPropertyAsBoolean(PREFER_VECTORS, false);
+
+ boolean showStacktraces = serialization.getPropertyAsBoolean(SHOW_STACKTRACES, false);
+ if (showStacktraces && Log.isWarn())
+ log.warn("The " + SHOW_STACKTRACES + " configuration option is deprecated and non-functional. Please remove this from your configuration file.");
+ serializationContext.restoreReferences = serialization.getPropertyAsBoolean(RESTORE_REFERENCES, false);
+ serializationContext.logPropertyErrors = serialization.getPropertyAsBoolean(LOG_PROPERTY_ERRORS, false);
+ serializationContext.ignorePropertyErrors = serialization.getPropertyAsBoolean(IGNORE_PROPERTY_ERRORS, true);
+ serializationContext.includeReadOnly = serialization.getPropertyAsBoolean(INCLUDE_READ_ONLY, false);
+ }
+
+ recordMessageSizes = properties.getPropertyAsBoolean(ConfigurationConstants.RECORD_MESSAGE_SIZES_ELEMENT, false);
+
+ if (recordMessageSizes && Log.isWarn())
+ log.warn("Setting <record-message-sizes> to true affects application performance and should only be used for debugging");
+
+ recordMessageTimes = properties.getPropertyAsBoolean(ConfigurationConstants.RECORD_MESSAGE_TIMES_ELEMENT, false);
+ }
+
+ /**
+ * Starts the endpoint if its associated <code>MessageBroker</code> is started,
+ * and if the endpoint is not already running. If subclasses override this method,
+ * they must call <code>super.start()</code>.
+ */
+ @Override
+ public void start()
+ {
+ if (isStarted())
+ return;
+
+ // Check if the MessageBroker is started
+ MessageBroker broker = getMessageBroker();
+ if (!broker.isStarted())
+ {
+ if (Log.isWarn())
+ {
+ Log.getLogger(getLogCategory()).warn("Endpoint with id '{0}' cannot be started" +
+ " when the MessageBroker is not started.",
+ new Object[]{getId()});
+ }
+ return;
+ }
+
+ // Set up management
+ if (isManaged() && broker.isManaged())
+ {
+ setupEndpointControl(broker);
+ MessageBrokerControl controller = (MessageBrokerControl)broker.getControl();
+ if (getControl() != null)
+ controller.addEndpoint(this);
+ }
+
+ // Setup Deserializer and Serializer for the SerializationContext
+ if (deserializerClass == null)
+ deserializerClass = createClass(getDeserializerClassName());
+
+ if (serializerClass == null)
+ serializerClass = createClass(getSerializerClassName());
+
+ serializationContext.setDeserializerClass(deserializerClass);
+ serializationContext.setSerializerClass(serializerClass);
+
+ // Setup endpoint features
+ ClassAliasRegistry registry = ClassAliasRegistry.getRegistry();
+ registry.registerAlias(AsyncMessageExt.CLASS_ALIAS, AsyncMessageExt.class.getName());
+ registry.registerAlias(AcknowledgeMessageExt.CLASS_ALIAS, AcknowledgeMessageExt.class.getName());
+ registry.registerAlias(CommandMessageExt.CLASS_ALIAS, CommandMessageExt.class.getName());
+ super.start();
+ }
+
+ /**
+ * Stops the endpoint if it is running. If subclasses override this method, they must
+ * call <code>super.stop()</code>.
+ */
+ @Override
+ public void stop()
+ {
+ if (!isStarted())
+ return;
+
+ super.stop();
+
+ // Remove management
+ if (isManaged() && getMessageBroker().isManaged())
+ {
+ if (getControl() != null)
+ {
+ getControl().unregister();
+ setControl(null);
+ }
+ setManaged(false);
+ }
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Getters and Setters for AbstractEndpoint properties
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Adds a client-load-balancing URL.
+ *
+ * @param url A client-load-balancing URL.
+ * @return <code>false</code> if the set already contains the URL, <code>true</code> otherwise.
+ *
+ */
+ public boolean addClientLoadBalancingUrl(String url)
+ {
+ if (clientLoadBalancingUrls == null)
+ clientLoadBalancingUrls = new HashSet<String>();
+
+ if (url == null || url.length() == 0)
+ {
+ // Invalid {0} configuration for endpoint ''{1}''; cannot add empty url.
+ ConfigurationException ce = new ConfigurationException();
+ ce.setMessage(ERR_MSG_EMTPY_CLIENT_LOAD_BALACNING_URL, new Object[]{CLIENT_LOAD_BALANCING_ELEMENT, getId()});
+ throw ce;
+ }
+
+ return clientLoadBalancingUrls.add(url);
+ }
+
+ /**
+ * Removes the client-load-balancing URL.
+ *
+ * @param url The URL to remove.
+ * @return <code>true</code> if the set contained the URL, <code>false</code> otherwise.
+ */
+ public boolean removeClientLoadBalancingUrl(String url)
+ {
+ if (clientLoadBalancingUrls != null)
+ return clientLoadBalancingUrls.remove(url);
+ return false;
+ }
+
+ /**
+ * Retrieves a snapshot of the current client-load-balancing URLs.
+ *
+ * @return A snapshot of the current client-load-balancing URLs, or <code>null</code> if
+ * no URL exists.
+ */
+ public Set<String> getClientLoadBalancingUrls()
+ {
+ return clientLoadBalancingUrls == null? null : new HashSet<String>(clientLoadBalancingUrls);
+ }
+
+ /**
+ * Retrieves the corresponding client channel type for the endpoint.
+ *
+ * @return The corresponding client channel type for the endpoint.
+ */
+ public String getClientType()
+ {
+ return clientType;
+ }
+
+ /**
+ * Sets the corresponding client channel type for the endpoint.
+ *
+ * @param type The corresponding client channel type for the endpoint.
+ */
+ public void setClientType(String type)
+ {
+ this.clientType = type;
+ }
+
+ /**
+ * Retrieves the <code>FlexClientOutboundQueueProcessorClass</code> of the endpoint.
+ *
+ * @return The <code>FlexClientOutboundQueueProcessorClass</code> of the endpoint.
+ */
+ public Class<?> getFlexClientOutboundQueueProcessorClass()
+ {
+ return flexClientOutboundQueueProcessClass;
+ }
+
+ /**
+ * Sets the the <code>FlexClientOutboundQueueProcessor</code> of the endpoint.
+ *
+ * @param flexClientOutboundQueueProcessorClass the Class of the Flex client outbound queue processor.
+ */
+ public void setFlexClientOutboundQueueProcessorClass(Class<?> flexClientOutboundQueueProcessorClass)
+ {
+ this.flexClientOutboundQueueProcessClass = flexClientOutboundQueueProcessorClass;
+ if (flexClientOutboundQueueProcessClass != null && flexClientOutboundQueueProcessorConfig != null)
+ {
+ FlexClientOutboundQueueProcessor processor = (FlexClientOutboundQueueProcessor)ClassUtil.createDefaultInstance(flexClientOutboundQueueProcessClass, null);
+ processor.initialize(flexClientOutboundQueueProcessorConfig);
+ }
+ }
+
+ /**
+ * Retrieves the properties for the <code>FlexClientOutboundQueueProcessor</code> of the endpoint.
+ *
+ * @return The properties for the <code>FlexClientOutboundQueueProcessor</code> of the endpoint.
+ */
+ public ConfigMap getFlexClientOutboundQueueProcessorConfig()
+ {
+ return flexClientOutboundQueueProcessorConfig;
+ }
+
+ /**
+ * Sets the properties for the <code>FlexClientOutboundQueueProcessor</code> of the endpoint.
+ *
+ * @param flexClientOutboundQueueProcessorConfig The configuration map.
+ */
+ public void setFlexClientOutboundQueueProcessorConfig(ConfigMap flexClientOutboundQueueProcessorConfig)
+ {
+ this.flexClientOutboundQueueProcessorConfig = flexClientOutboundQueueProcessorConfig;
+ if (flexClientOutboundQueueProcessorConfig != null && flexClientOutboundQueueProcessClass != null)
+ {
+ FlexClientOutboundQueueProcessor processor = (FlexClientOutboundQueueProcessor)ClassUtil.createDefaultInstance(flexClientOutboundQueueProcessClass, null);
+ processor.initialize(flexClientOutboundQueueProcessorConfig);
+ }
+ }
+
+ /**
+ * Sets the ID of the <code>AbstractEndpoint</code>. If the <code>AbstractEndpoint</code>
+ * has a <code>MessageBroker</code> assigned, it also updates the ID in the
+ * <code>MessageBroker</code>.
+ *
+ * @param id The endpoint ID.
+ */
+ @Override
+ public void setId(String id)
+ {
+ String oldId = getId();
+
+ if (oldId != null && oldId.equals(id))
+ return;
+
+ super.setId(id);
+
+ // Update the endpoint id in the broker
+ MessageBroker broker = getMessageBroker();
+ if (broker != null)
+ {
+ // broker must have the endpoint then
+ broker.removeEndpoint(oldId);
+ broker.addEndpoint(this);
+ }
+ }
+
+ /**
+ * Retrieves the <code>MessageBroker</code> of the <code>AbstractEndpoint</code>.
+ *
+ * @return The <code>MessageBroker</code> of the <code>AbstractEndpoint</code>.
+ */
+ public MessageBroker getMessageBroker()
+ {
+ return (MessageBroker)getParent();
+ }
+
+ /**
+ * Sets the <code>MessageBroker</code> of the <code>AbstractEndpoint</code>.
+ * Removes the <code>AbstractEndpoint</code> from the old broker
+ * (if there was one) and adds to the list of endpoints in the new broker.
+ *
+ * @param broker The <code>MessageBroker</code> of the <code>AbstractEndpoint</code>.
+ */
+ public void setMessageBroker(MessageBroker broker)
+ {
+ MessageBroker oldBroker = getMessageBroker();
+
+ setParent(broker);
+
+ if (oldBroker != null)
+ oldBroker.removeEndpoint(getId());
+
+ // Add endpoint to the new broker if needed
+ if (broker.getEndpoint(getId()) != this)
+ broker.addEndpoint(this);
+ }
+
+ /**
+ * Return the highest messaging version currently available via this
+ * endpoint.
+ * @return double the messaging version
+ */
+ public double getMessagingVersion()
+ {
+ return messagingVersion;
+ }
+
+ /**
+ * Retrieves the port of the URL of the endpoint.
+ * A return value of 0 denotes no port in the channel URL.
+ *
+ * @return The port of the URL of the endpoint, or 0 if the URL does not contain
+ * a port number.
+ */
+ public int getPort()
+ {
+ return port;
+ }
+
+ /**
+ * Determines whether the endpoint is secure.
+ *
+ * @return <code>false</code> by default.
+ */
+ public boolean isSecure()
+ {
+ return false;
+ }
+
+ /**
+ * Determines if the endpoint clients connect to directly is mirrored and running
+ * on a remote host, in which case this local instance is not started and will service no direct
+ * client connections.
+ *
+ * @return <code>true</code> if this endpoint will not process direct client connections and is just
+ * a local representation of a symmetric endpoint on a remote host that will, <code>false</code> otherwise.
+ */
+ public boolean isRemote()
+ {
+ return remote;
+ }
+
+ /**
+ * Sets the remote status for this endpoint.
+ *
+ * @param value <code>true</code> if this endpoint will not process direct client connections and is just
+ * a local representation of a symmetric endpoint on a remote host that will, <code>false</code> otherwise.
+ */
+ public void setRemote(boolean value)
+ {
+ remote = value;
+ }
+
+ /**
+ * Retrieves the <tt>Server</tt> that the endpoint is using, or <code>null</code> if
+ * no server has been assigned.
+ * @return Server The Server object the endpoint is using.
+ */
+ public Server getServer()
+ {
+ return server;
+ }
+
+ /**
+ * Sets the <tt>Server</tt> that the endpoint will use.
+ * @param server The Server object.
+ */
+ public void setServer(Server server)
+ {
+ this.server = server;
+ }
+
+ /**
+ * Determines whether the endpoint is server only.
+ *
+ * @return <code>true</code> if the endpoint is server only, <code>false</code> otherwise.
+ */
+ public boolean getServerOnly()
+ {
+ return serverOnly;
+ }
+
+ /**
+ * Sets whether the endpoint is server only.
+ *
+ * @param serverOnly <code>true</code> if the endpoint is server only, <code>false</code> otherwise.
+ */
+ public void setServerOnly(boolean serverOnly)
+ {
+ this.serverOnly = serverOnly;
+ }
+
+ /**
+ * Retrieves the <code>SecurityConstraint</code> of the <code>Endpoint</code>.
+ *
+ * @return The <code>SecurityConstraint</code> of the <code>Endpoint</code>.
+ */
+ public SecurityConstraint getSecurityConstraint()
+ {
+ return securityConstraint;
+ }
+
+ /**
+ * Sets the <code>SecurityConstraint</code> of the <code>Endpoint</code>.
+ *
+ * @param securityConstraint The SecurityContraint object.
+ */
+ public void setSecurityConstraint(SecurityConstraint securityConstraint)
+ {
+ this.securityConstraint = securityConstraint;
+ }
+
+ /**
+ * Retrieves the <code>SerializationContext</code> of the endpoint.
+ *
+ * @return The <code>SerializationContext</code> of the endpoint.
+ */
+ public SerializationContext getSerializationContext()
+ {
+ return serializationContext;
+ }
+
+ /**
+ * Sets the <code>SerializationContext</code> of the endpoint.
+ *
+ * @param serializationContext The SerializationContext object.
+ */
+ public void setSerializationContext(SerializationContext serializationContext)
+ {
+ this.serializationContext = serializationContext;
+ }
+
+ /**
+ * Retrieves the <code>TypeMarshaller</code> of the endpoint.
+ *
+ * @return The <code>TypeMarshaller</code> of the endpoint.
+ */
+ public TypeMarshaller getTypeMarshaller()
+ {
+ if (typeMarshaller == null)
+ typeMarshaller = new ASTranslator();
+
+ return typeMarshaller;
+ }
+
+ /**
+ * Sets the <code>TypeMarshaller</code> of the endpoint.
+ *
+ * @param typeMarshaller The TypeMarshaller object.
+ */
+ public void setTypeMarshaller(TypeMarshaller typeMarshaller)
+ {
+ this.typeMarshaller = typeMarshaller;
+ }
+
+ /**
+ * Retrieves the URL of the endpoint.
+ *
+ * @return The URL of the endpoint.
+ */
+ public String getUrl()
+ {
+ return url;
+ }
+
+ /**
+ * Sets the URL of the endpoint.
+ *
+ * @param url The URL of the endpoint.
+ */
+ public void setUrl(String url)
+ {
+ this.url = url;
+ port = internalParsePort(url);
+ parsedForContext = null;
+ clientContextParsed = false;
+ }
+
+ /**
+ *
+ * Returns the url of the endpoint parsed for the client.
+ *
+ * @return The url of the endpoint parsed for the client.
+ */
+ public String getUrlForClient()
+ {
+ if (!clientContextParsed)
+ {
+ HttpServletRequest req = FlexContext.getHttpRequest();
+ if (req != null)
+ {
+ String contextPath = req.getContextPath();
+ parseClientUrl(contextPath);
+ }
+ else
+ {
+ return url;
+ }
+ }
+ return parsedClientUrl;
+ }
+
+ /**
+ *
+ * Returns the total throughput for the endpoint.
+ *
+ * @return The total throughput for the endpoint.
+ */
+ public long getThroughput()
+ {
+ EndpointControl control = (EndpointControl)getControl();
+
+ return control.getBytesDeserialized().longValue() + control.getBytesSerialized().longValue();
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Other Public APIs
+ //
+ //--------------------------------------------------------------------------
+
+
+ public static void addNoCacheHeaders(HttpServletRequest req, HttpServletResponse res)
+ {
+ String userAgent = req.getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
+
+ // For MSIE over HTTPS, set additional Cache-Control values.
+ if (req.isSecure() && userAgent != null && userAgent.indexOf(UserAgentSettings.USER_AGENT_MSIE) != -1)
+ res.addHeader(HEADER_NAME_CACHE_CONTROL, "no-store, no-cache, must-revalidate, post-check=0, pre-check=0, no-transform, private");
+ else // For the rest, set no-cache header value only.
+ res.addHeader(HEADER_NAME_CACHE_CONTROL, "no-cache");
+
+ // Set an expiration date in the past as well.
+ res.setDateHeader(HEADER_NAME_EXPIRES, 946080000000L); //Approx Jan 1, 2000
+
+ // Set Pragma no-cache header if we're not MSIE over HTTPS
+ if (!(req.isSecure() && userAgent != null && userAgent.indexOf(UserAgentSettings.USER_AGENT_MSIE) != -1))
+ res.setHeader(HEADER_NAME_PRAGMA, "no-cache");
+ }
+
+ /**
+ *
+ */
+ public Message convertToSmallMessage(Message message)
+ {
+ if (message instanceof SmallMessage)
+ {
+ Message smallMessage = ((SmallMessage)message).getSmallMessage();
+ if (smallMessage != null)
+ message = smallMessage;
+ }
+
+ return message;
+ }
+
+ /**
+ * Retrieves a <code>ConfigMap</code> of the endpoint properties the client
+ * needs. Subclasses should add additional properties to <code>super.describeDestination</code>,
+ * or return <code>null</code> if they must not send their properties to the client.
+ *
+ * @return ConfigMap The ConfigMap object.
+ */
+ public ConfigMap describeEndpoint()
+ {
+ ConfigMap channelConfig = new ConfigMap();
+
+ if (serverOnly) // Client does not need server only endpoints.
+ return channelConfig;
+
+ channelConfig.addProperty(ID_ATTR, getId());
+ channelConfig.addProperty(TYPE_ATTR, getClientType());
+
+ ConfigMap properties = new ConfigMap();
+
+ boolean containsClientLoadBalancing = clientLoadBalancingUrls != null && !clientLoadBalancingUrls.isEmpty();
+ if (containsClientLoadBalancing)
+ {
+ ConfigMap clientLoadBalancing = new ConfigMap();
+ for (Iterator<String> iterator = clientLoadBalancingUrls.iterator(); iterator.hasNext();)
+ {
+ ConfigMap url = new ConfigMap();
+ // Adding as a value rather than attribute to the parent.
+ url.addProperty(EMPTY_STRING, iterator.next());
+ clientLoadBalancing.addProperty(URL_ATTR, url);
+ }
+ properties.addProperty(CLIENT_LOAD_BALANCING_ELEMENT, clientLoadBalancing);
+ }
+
+ // Add endpoint uri only if no client-load-balancing urls are defined.
+ if (!containsClientLoadBalancing)
+ {
+ ConfigMap endpointConfig = new ConfigMap();
+ endpointConfig.addProperty(URI_ATTR, getUrlForClient());
+ channelConfig.addProperty(ENDPOINT_ELEMENT, endpointConfig);
+ }
+
+ if (connectTimeoutSeconds > 0)
+ {
+ ConfigMap connectTimeoutConfig = new ConfigMap();
+ connectTimeoutConfig.addProperty(EMPTY_STRING, String.valueOf(connectTimeoutSeconds));
+ properties.addProperty(CONNECT_TIMEOUT_SECONDS_ELEMENT, connectTimeoutConfig);
+ }
+
+ if (requestTimeoutSeconds > 0)
+ {
+ ConfigMap requestTimeoutSeconds = new ConfigMap();
+ requestTimeoutSeconds.addProperty(EMPTY_STRING, String.valueOf(requestTimeoutSeconds));
+ properties.addProperty(REQUEST_TIMEOUT_SECONDS_ELEMENT, requestTimeoutSeconds);
+ }
+
+ if (recordMessageTimes)
+ {
+ ConfigMap recordMessageTimesMap = new ConfigMap();
+ // Adding as a value rather than attribute to the parent
+ recordMessageTimesMap.addProperty(EMPTY_STRING, TRUE_STRING);
+ properties.addProperty(RECORD_MESSAGE_TIMES_ELEMENT, recordMessageTimesMap);
+ }
+
+ if (recordMessageSizes)
+ {
+ ConfigMap recordMessageSizesMap = new ConfigMap();
+ // Adding as a value rather than attribute to the parent
+ recordMessageSizesMap.addProperty(EMPTY_STRING, TRUE_STRING);
+ properties.addProperty(RECORD_MESSAGE_SIZES_ELEMENT, recordMessageSizesMap);
+ }
+
+ ConfigMap serialization = new ConfigMap();
+ serialization.addProperty(ENABLE_SMALL_MESSAGES_ELEMENT, Boolean.toString(serializationContext.enableSmallMessages));
+ properties.addProperty(SERIALIZATION_ELEMENT, serialization);
+
+ if (properties.size() > 0)
+ channelConfig.addProperty(PROPERTIES_ELEMENT, properties);
+
+ return channelConfig;
+ }
+
+ /**
+ *
+ * Make sure this matches with ChannelSettings.getParsedUri.
+ */
+ public String getParsedUrl(String contextPath)
+ {
+ parseUrl(contextPath);
+ return parsedUrl;
+ }
+
+ /**
+ *
+ */
+ public void handleClientMessagingVersion(Number version)
+ {
+ if (version != null)
+ {
+ boolean clientSupportsSmallMessages = version.doubleValue() >= messagingVersion;
+ if (clientSupportsSmallMessages && getSerializationContext().enableSmallMessages)
+ {
+ FlexSession session = FlexContext.getFlexSession();
+ if (session != null)
+ session.setUseSmallMessages(true);
+ }
+ }
+ }
+
+ /**
+ * Default implementation of the Endpoint <code>service</code> method.
+ * Subclasses should call <code>super.service</code> before their custom
+ * code.
+ *
+ * @param req The HttpServletRequest object.
+ * @param res The HttpServletResponse object.
+ */
+ public void service(HttpServletRequest req, HttpServletResponse res)
+ {
+ validateRequestProtocol(req);
+ }
+
+ /**
+ * Typically invoked by subclasses, this method transforms decoded message data
+ * into the appropriate Message object and routes the Message to the endpoint's broker.
+ *
+ * @param message The decoded message data.
+ * @return Message The transformed message.
+ */
+ public Message serviceMessage(Message message)
+ {
+ if (isManaged())
+ {
+ ((EndpointControl) getControl()).incrementServiceMessageCount();
+ }
+
+ try
+ {
+ FlexContext.setThreadLocalEndpoint(this);
+ Message ack = null;
+
+ // Make sure this message is timestamped.
+ if (message.getTimestamp() == 0)
+ {
+ message.setTimestamp(System.currentTimeMillis());
+ }
+
+ // Reset the endpoint header for inbound messages to the id for this endpoint
+ // to guarantee that it's correct. Don't allow clients to spoof this.
+ // However, if the endpoint id is passed as null we need to tag the message to
+ // skip channel/endpoint validation at the destination level (MessageBroker.inspectChannel()).
+ if (message.getHeader(Message.ENDPOINT_HEADER) != null)
+ message.setHeader(Message.VALIDATE_ENDPOINT_HEADER, Boolean.TRUE);
+ message.setHeader(Message.ENDPOINT_HEADER, getId());
+
+ if (message instanceof CommandMessage)
+ {
+ CommandMessage command = (CommandMessage)message;
+
+ // Apply channel endpoint level constraint; always allow login commands through.
+ int operation = command.getOperation();
+ if (operation != CommandMessage.LOGIN_OPERATION)
+ checkSecurityConstraint(message);
+
+ // Handle general (not Consumer specific) poll requests here.
+ // We need to fetch all outbound messages for client subscriptions over this endpoint.
+ // We identify these general poll messages by their operation and a null clientId.
+ if (operation == CommandMessage.POLL_OPERATION && message.getClientId() == null)
+ {
+ verifyFlexClientSupport(command);
+
+
+ FlexClient flexClient = FlexContext.getFlexClient();
+ ack = handleFlexClientPollCommand(flexClient, command);
+ }
+ else if (operation == CommandMessage.DISCONNECT_OPERATION)
+ {
+ ack = handleChannelDisconnect(command);
+ }
+ else if (operation == CommandMessage.TRIGGER_CONNECT_OPERATION)
+ {
+ ack = new AcknowledgeMessage();
+ ((AcknowledgeMessage)ack).setCorrelationId(message.getMessageId());
+
+ boolean needsConfig = false;
+ if (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER) != null)
+ needsConfig = ((Boolean)(command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER)));
+
+ // Send configuration information only if the client requested.
+ if (needsConfig)
+ {
+ ConfigMap serverConfig = getMessageBroker().describeServices(this);
+ if (serverConfig.size() > 0)
+ ack.setBody(serverConfig);
+ }
+ }
+ else
+ {
+ // Block a subset of commands for legacy clients that need to be recompiled to
+ // interop with a 2.5+ server.
+ if (operation == CommandMessage.SUBSCRIBE_OPERATION || operation == CommandMessage.POLL_OPERATION)
+ verifyFlexClientSupport(command);
+
+ ack = getMessageBroker().routeCommandToService((CommandMessage) message, this);
+
+ // Look for client advertised features on initial connect.
+ if (operation == CommandMessage.CLIENT_PING_OPERATION || operation == CommandMessage.LOGIN_OPERATION)
+ {
+ Number clientVersion = (Number)command.getHeader(CommandMessage.MESSAGING_VERSION);
+ handleClientMessagingVersion(clientVersion);
+
+ // Also respond by advertising the messaging version on the
+ // acknowledgement.
+ ack.setHeader(CommandMessage.MESSAGING_VERSION, new Double(messagingVersion));
+ }
+ }
+ }
+ else
+ {
+ // Block any AsyncMessages from a legacy client.
+ if (message instanceof AsyncMessage)
+ verifyFlexClientSupport(message);
+
+ // Apply channel endpoint level constraint.
+ checkSecurityConstraint(message);
+
+ ack = getMessageBroker().routeMessageToService(message, this);
+ }
+
+ return ack;
+ }
+ finally
+ {
+ FlexContext.setThreadLocalEndpoint(null);
+ }
+ }
+
+ /**
+ * Utility method that endpoint implementations (or associated classes)
+ * should invoke when they receive an incoming message from a client but before
+ * servicing it. This method looks up or creates the proper FlexClient instance
+ * based upon the client the message came from and places it in the FlexContext.
+ *
+ * @param message The incoming message to process.
+ *
+ * @return The FlexClient, or <code>null</code> if the message did not contain a FlexClient ID value.
+ */
+ public FlexClient setupFlexClient(Message message)
+ {
+ FlexClient flexClient = null;
+ if (message.getHeaders().containsKey(Message.FLEX_CLIENT_ID_HEADER))
+ {
+ String id = (String)message.getHeaders().get(Message.FLEX_CLIENT_ID_HEADER);
+ // If the id is null, reset to the special token value that let's us differentiate
+ // between legacy clients and 2.5+ clients.
+ if (id == null)
+ id = FlexClient.NULL_FLEXCLIENT_ID;
+ flexClient = setupFlexClient(id);
+ }
+ return flexClient;
+ }
+
+ /**
+ * Utility method that endpoint implementations (or associated classes)
+ * should invoke when they receive an incoming message from a client but before
+ * servicing it. This method looks up or creates the proper FlexClient instance
+ * based upon the FlexClient ID value received from the client.
+ * It also associates this FlexClient instance with the current FlexSession.
+ *
+ * @param id The FlexClient ID value from the client.
+ *
+ * @return The FlexClient or null if the provided ID was <code>null</code>.
+ */
+ public FlexClient setupFlexClient(String id)
+ {
+ FlexClient flexClient = null;
+ if (id != null)
+ {
+ // This indicates that we're dealing with a non-legacy client that hasn't been
+ // assigned a FlexClient Id yet. Reset to null to generate a fresh Id.
+ if (id.equals(FlexClient.NULL_FLEXCLIENT_ID))
+ id = null;
+
+ flexClient = getMessageBroker().getFlexClientManager().getFlexClient(id);
+ // Make sure the FlexClient and FlexSession are associated.
+ FlexSession session = FlexContext.getFlexSession();
+ flexClient.registerFlexSession(session);
+ // And place the FlexClient in FlexContext for this request.
+ FlexContext.setThreadLocalFlexClient(flexClient);
+ }
+ return flexClient;
+ }
+
+ /**
+ *
+ * Performance metrics gathering property
+ */
+ public boolean isRecordMessageSizes()
+ {
+ return recordMessageSizes;
+ }
+
+ /**
+ *
+ * Performance metrics gathering property
+ */
+ public boolean isRecordMessageTimes()
+ {
+ return recordMessageTimes;
+ }
+
+ /**
+ *
+ */
+ public void setThreadLocals()
+ {
+ if (serializationContext != null)
+ {
+ SerializationContext context = (SerializationContext)serializationContext.clone();
+ // Get the latest deserialization validator from the broker.
+ MessageBroker broker = getMessageBroker();
+ DeserializationValidator validator = broker == null? null : broker.getDeserializationValidator();
+ context.setDeserializationValidator(validator);
+ SerializationContext.setSerializationContext(context);
+ }
+
+ TypeMarshallingContext.setTypeMarshaller(getTypeMarshaller());
+ }
+
+ /**
+ *
+ */
+ public void clearThreadLocals()
+ {
+ SerializationContext.clearThreadLocalObjects();
+ TypeMarshallingContext.clearThreadLocalObjects();
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Protected/private methods.
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Returns the log category of the <code>AbstractEndpoint</code>. Subclasses
+ * can override to provide a more specific logging category.
+ *
+ * @return The log category.
+ */
+ @Override
+ protected String getLogCategory()
+ {
+ return LOG_CATEGORY;
+ }
+
+ /**
+ * Hook method invoked when a disconnect command is received from a client channel.
+ * The response returned by this method is not guaranteed to get to the client, which
+ * is free to terminate its physical connection at any point.
+ *
+ * @param disconnectCommand The disconnect command.
+ * @return The response; by default an empty <tt>AcknowledgeMessage</tt>.
+ */
+ protected Message handleChannelDisconnect(CommandMessage disconnectCommand)
+ {
+ return new AcknowledgeMessage();
+ }
+
+ /**
+ * Hook method for varying poll reply strategies for synchronous endpoints.
+ * The default behavior performs a non-waited, synchronous poll for the FlexClient
+ * and if any messages are currently queued they are returned immediately. If no
+ * messages are queued an empty response is returned immediately.
+ *
+ * @param flexClient The FlexClient that issued the poll request.
+ * @param pollCommand The poll command from the client.
+ * @return The FlushResult response.
+ */
+ protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand)
+ {
+ return flexClient.poll(getId());
+ }
+
+ /**
+ * Handles a general poll request from a FlexClient to this endpoint.
+ * Subclasses may override to implement different poll handling strategies.
+ *
+ * @param flexClient The FlexClient that issued the poll request.
+ * @param pollCommand The poll command from the client.
+ * @return The poll response message; either for success or fault.
+ */
+ protected Message handleFlexClientPollCommand(FlexClient flexClient, CommandMessage pollCommand)
+ {
+ if (Log.isDebug())
+ Log.getLogger(getMessageBroker().getLogCategory(pollCommand)).debug(
+ "Before handling general client poll request. " + StringUtils.NEWLINE +
+ " incomingMessage: " + pollCommand + StringUtils.NEWLINE);
+
+ FlushResult flushResult = handleFlexClientPoll(flexClient, pollCommand);
+ Message pollResponse = null;
+
+ // Generate a no-op poll response if necessary; prevents a single client from busy polling when the server
+ // is doing wait()-based long-polls.
+ if ((flushResult instanceof PollFlushResult) && ((PollFlushResult)flushResult).isClientProcessingSuppressed())
+ {
+ pollResponse = new CommandMessage(CommandMessage.CLIENT_SYNC_OPERATION);
+ pollResponse.setHeader(CommandMessage.NO_OP_POLL_HEADER, Boolean.TRUE);
+ }
+
+ if (pollResponse == null)
+ {
+ List<Message> messagesToReturn = (flushResult != null) ? flushResult.getMessages() : null;
+ if (messagesToReturn != null && !messagesToReturn.isEmpty())
+ {
+ pollResponse = new CommandMessage(CommandMessage.CLIENT_SYNC_OPERATION);
+ pollResponse.setBody(messagesToReturn.toArray());
+ }
+ else
+ {
+ pollResponse = new AcknowledgeMessage();
+ }
+ }
+
+ // Set the adaptive poll wait time if necessary.
+ if (flushResult != null)
+ {
+ int nextFlushWaitTime = flushResult.getNextFlushWaitTimeMillis();
+ if (nextFlushWaitTime > 0)
+ pollResponse.setHeader(CommandMessage.POLL_WAIT_HEADER, new Integer(nextFlushWaitTime));
+ }
+
+ if (Log.isDebug())
+ {
+ String debugPollResult = Log.getPrettyPrinter().prettify(pollResponse);
+ Log.getLogger(getMessageBroker().getLogCategory(pollCommand)).debug(
+ "After handling general client poll request. " + StringUtils.NEWLINE +
+ " reply: " + debugPollResult + StringUtils.NEWLINE);
+ }
+
+ return pollResponse;
+ }
+
+ /**
+ * Initializes the <code>Endpoint</code> with the client-load-balancing urls.
+ *
+ * @param id Id of the <code>Endpoint</code>.
+ * @param properties Properties for the <code>Endpoint</code>.
+ */
+
+ protected void initializeClientLoadBalancing(String id, ConfigMap properties)
+ {
+ if (!properties.containsKey(CLIENT_LOAD_BALANCING_ELEMENT))
+ return;
+
+ ConfigMap clientLoadBalancing = properties.getPropertyAsMap(CLIENT_LOAD_BALANCING_ELEMENT, null);
+ if (clientLoadBalancing == null)
+ {
+ // Invalid {0} configuration for endpoint ''{1}''; no urls defined.
+ ConfigurationException ce = new ConfigurationException();
+ ce.setMessage(ERR_MSG_EMPTY_CLIENT_LOAD_BALANCING_ELEMENT, new Object[]{CLIENT_LOAD_BALANCING_ELEMENT, getId()});
+ throw ce;
+ }
+
+ @SuppressWarnings("unchecked")
+ List<String> urls = clientLoadBalancing.getPropertyAsList(URL_ATTR, null);
+ if (urls == null || urls.isEmpty())
+ {
+ // Invalid {0} configuration for endpoint ''{1}''; no urls defined.
+ ConfigurationException ce = new ConfigurationException();
+ ce.setMessage(ERR_MSG_EMPTY_CLIENT_LOAD_BALANCING_ELEMENT, new Object[]{CLIENT_LOAD_BALANCING_ELEMENT, getId()});
+ throw ce;
+ }
+
+ for (Iterator<String> iterator = urls.iterator(); iterator.hasNext();)
+ {
+ String url = iterator.next();
+ if (!addClientLoadBalancingUrl(url) && Log.isWarn())
+ log.warn("Endpoint '{0}' is ignoring the url '{1}' as it's already in the set of client-load-balancing urls.", new Object[]{id, url});
+ }
+ }
+
+ protected void checkSecurityConstraint(Message message)
+ {
+ if (securityConstraint != null)
+ {
+ getMessageBroker().getLoginManager().checkConstraint(securityConstraint);
+ }
+ }
+
+ /**
+ * Returns the deserializer class name used by the endpoint.
+ *
+ * @return The deserializer class name used by the endpoint.
+ */
+ protected abstract String getDeserializerClassName();
+
+ /**
+ * Returns the serializer class name used by the endpoint.
+ *
+ * @return The serializer class name used by the endpoint.
+ */
+ protected abstract String getSerializerClassName();
+
+ /**
+ * Returns the secure protocol scheme for the endpoint.
+ *
+ * @return The secure protocol scheme for the endpoint.
+ */
+ protected abstract String getSecureProtocolScheme();
+
+ /**
+ * Returns the insecure protocol scheme for the endpoint.
+ *
+ * @return The insecure protocol scheme for the endpoint.
+ */
+ protected abstract String getInsecureProtocolScheme();
+
+ /**
+ * Invoked automatically to allow the <code>AbstractEndpoint</code> to setup
+ * its corresponding MBean control. Subclasses should override to setup and
+ * register their MBean control. Manageable subclasses should override this
+ * template method.
+ *
+ * @param broker The <code>MessageBroker</code> that manages this
+ * <code>AbstractEndpoint</code>.
+ */
+ protected abstract void setupEndpointControl(MessageBroker broker);
+
+ /**
+ * Validates the endpoint url scheme.
+ */
+ protected void validateEndpointProtocol()
+ {
+ String scheme = isSecure()? getSecureProtocolScheme() : getInsecureProtocolScheme();
+ if (!url.startsWith(scheme))
+ {
+ ConfigurationException ce = new ConfigurationException();
+ ce.setMessage(ERR_MSG_INVALID_URL_SCHEME, new Object[] {url, scheme});
+ throw ce;
+ }
+ }
+
+ protected void validateRequestProtocol(HttpServletRequest req)
+ {
+ // Secure url can talk to secure or non-secure endpoint.
+ // Non-secure url can only talk to non-secure endpoint.
+ boolean secure = req.isSecure();
+ if (!secure && isSecure())
+ {
+ // Secure endpoints must be contacted via a secure protocol.
+ String endpointPath = req.getServletPath() + req.getPathInfo();
+ SecurityException se = new SecurityException();
+ se.setMessage(NONSECURE_PROTOCOL, new Object[]{endpointPath});
+ throw se;
+ }
+ }
+
+ /**
+ *
+ * Verifies that the remote client supports the FlexClient API.
+ * Legacy clients that do not support this receive a message fault for any messages they send.
+ *
+ * @param message The message to verify.
+ */
+ protected void verifyFlexClientSupport(Message message)
+ {
+ if (FlexContext.getFlexClient() == null)
+ {
+ MessageException me = new MessageException();
+ me.setMessage(REQUIRES_FLEXCLIENT_SUPPORT, new Object[] {message.getDestination()});
+ throw me;
+ }
+ }
+
+ /**
+ *
+ */
+ protected Class<?> createClass(String className)
+ {
+ return ClassUtil.createClass(className, FlexContext.getMessageBroker() == null ? null :
+ FlexContext.getMessageBroker().getClassLoader());
+ }
+
+ // This should match with ChannelSetting.parseClientUri
+ private void parseClientUrl(String contextPath)
+ {
+ if (!clientContextParsed)
+ {
+ String channelEndpoint = url.trim();
+
+ // either {context-root} or {context.root} is legal
+ channelEndpoint = StringUtils.substitute(channelEndpoint, "{context-root}", ConfigurationConstants.CONTEXT_PATH_TOKEN);
+
+ if ((contextPath == null) && (channelEndpoint.indexOf(ConfigurationConstants.CONTEXT_PATH_TOKEN) != -1))
+ {
+ // context root must be specified before it is used
+ ConfigurationException e = new ConfigurationException();
+ e.setMessage(ConfigurationConstants.UNDEFINED_CONTEXT_ROOT, new Object[]{getId()});
+ throw e;
+ }
+
+ // simplify the number of combinations to test by ensuring our
+ // context path always starts with a slash
+ if (contextPath != null && !contextPath.startsWith("/"))
+ {
+ contextPath = "/" + contextPath;
+ }
+
+ // avoid double-slashes from context root by replacing /{context.root}
+ // in a single replacement step
+ if (channelEndpoint.indexOf(ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN) != -1)
+ {
+ // but avoid double-slash for /{context.root}/etc when we have
+ // the default context root
+ if ("/".equals(contextPath) && !ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN.equals(channelEndpoint))
+ contextPath = "";
+
+ channelEndpoint = StringUtils.substitute(channelEndpoint, ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN, contextPath);
+ }
+ // otherwise we have something like {server.name}:{server.port}{context.root}...
+ else
+ {
+ // but avoid double-slash for {context.root}/etc when we have
+ // the default context root
+ if ("/".equals(contextPath) && !ConfigurationConstants.CONTEXT_PATH_TOKEN.equals(channelEndpoint))
+ contextPath = "";
+
+ channelEndpoint = StringUtils.substitute(channelEndpoint, ConfigurationConstants.CONTEXT_PATH_TOKEN, contextPath);
+ }
+
+ parsedClientUrl = channelEndpoint;
+ clientContextParsed = true;
+ }
+ }
+
+ private int internalParsePort(String url)
+ {
+ int port = ChannelSettings.parsePort(url);
+ // If there is no specified port, log an info message as urls without ports are supported
+ if (port == 0 && Log.isInfo())
+ log.info("No port specified in channel URL: {0}", new Object[]{url});
+
+ return port == -1? 0 : port; // Replace -1 with 0.
+ }
+
+ private void parseUrl(String contextPath)
+ {
+ // Parse again only if never parsed before or parsed for a different contextPath.
+ if (parsedForContext == null || !parsedForContext.equals(contextPath))
+ {
+ String channelEndpoint = url.toLowerCase().trim();
+
+ // Remove protocol and host info
+ String insecureProtocol = getInsecureProtocolScheme() + "://";
+ String secureProtocol = getSecureProtocolScheme() + "://";
+ if (channelEndpoint.startsWith(secureProtocol) || channelEndpoint.startsWith(insecureProtocol))
+ {
+ int nextSlash = channelEndpoint.indexOf('/', 8);
+ if (nextSlash > 0)
+ {
+ channelEndpoint = channelEndpoint.substring(nextSlash);
+ }
+ }
+
+ // either {context-root} or {context.root} is legal
+ channelEndpoint = StringUtils.substitute(channelEndpoint, "{context-root}", ConfigurationConstants.CONTEXT_PATH_TOKEN);
+
+ // Remove context path info
+ if (channelEndpoint.startsWith(ConfigurationConstants.CONTEXT_PATH_TOKEN))
+ {
+ channelEndpoint = channelEndpoint.substring(ConfigurationConstants.CONTEXT_PATH_TOKEN.length());
+ }
+ else if (channelEndpoint.startsWith(ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN))
+ {
+ channelEndpoint = channelEndpoint.substring(ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN.length());
+ }
+ else if (contextPath.length() > 0)
+ {
+ if (channelEndpoint.startsWith(contextPath.toLowerCase()))
+ {
+ channelEndpoint = channelEndpoint.substring(contextPath.length());
+ }
+ }
+
+ // We also don't match on trailing slashes
+ if (channelEndpoint.endsWith("/"))
+ {
+ channelEndpoint = channelEndpoint.substring(0, channelEndpoint.length() - 1);
+ }
+
+ parsedUrl = channelEndpoint;
+ parsedForContext = contextPath;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java b/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java
new file mode 100644
index 0000000..bb31b39
--- /dev/null
+++ b/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints;
+
+import flex.management.runtime.messaging.endpoints.EndpointControl;
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.HttpFlexSession;
+import flex.messaging.MessageClient;
+import flex.messaging.client.FlexClient;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.config.ConfigurationConstants;
+import flex.messaging.endpoints.amf.AMFFilter;
+import flex.messaging.io.MessageIOConstants;
+import flex.messaging.io.amf.ActionContext;
+import flex.messaging.log.HTTPRequestLog;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.Message;
+import flex.messaging.util.SettingsReplaceUtil;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract base class for all the HTTP-based endpoints.
+ */
+public abstract class BaseHTTPEndpoint extends AbstractEndpoint
+{
+ //--------------------------------------------------------------------------
+ //
+ // Public Static Constants
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * The secure and insecure URL schemes for the HTTP endpoint.
+ */
+ public static final String HTTP_PROTOCOL_SCHEME = "http";
+ public static final String HTTPS_PROTOCOL_SCHEME = "https";
+
+ //--------------------------------------------------------------------------
+ //
+ // Private Static Constants
+ //
+ //--------------------------------------------------------------------------
+
+ private static final String ADD_NO_CACHE_HEADERS = "add-no-cache-headers";
+ private static final String REDIRECT_URL = "redirect-url";
+ private static final String INVALIDATE_SESSION_ON_DISCONNECT = "invalidate-session-on-disconnect";
+ private static final String HTTP_RESPONSE_HEADERS = "http-response-headers";
+ private static final String HEADER_ATTR = "header";
+
+ private static final String HEADER_NAME_ORIGIN = "Origin";
+ private static final String ACCESS_CONTROL = "Access-Control-";
+ private static final String SESSION_REWRITING_ENABLED = "session-rewriting-enabled";
+
+ private static final int ERR_MSG_DUPLICATE_SESSIONS_DETECTED = 10035;
+ private static final String REQUEST_ATTR_DUPLICATE_SESSION_FLAG = "flex.messaging.request.DuplicateSessionDetected";
+
+ //--------------------------------------------------------------------------
+ //
+ // Constructor
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Constructs an unmanaged <code>BaseHTTPEndpoint</code>.
+ */
+ public BaseHTTPEndpoint()
+ {
+ this(false);
+ }
+
+ /**
+ * Constructs a <code>BaseHTTPEndpoint</code> with the specified management setting.
+ *
+ * @param enableManagement <code>true</code> if the <code>BaseHTTPEndpoint</code>
+ * is manageable; otherwise <code>false</code>.
+ */
+ public BaseHTTPEndpoint(boolean enableManagement)
+ {
+ super(enableManagement);
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Initialize, validate, start, and stop methods.
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Initializes the <code>Endpoint</code> with the properties.
+ * If subclasses override this method, they must call <code>super.initialize()</code>.
+ *
+ * @param id The ID of the <code>Endpoint</code>.
+ * @param properties Properties for the <code>Endpoint</code>.
+ */
+ @Override public void initialize(String id, ConfigMap properties)
+ {
+ super.initialize(id, properties);
+
+ if (properties == null || properties.size() == 0)
+ return;
+
+ // General HTTP props.
+ addNoCacheHeaders = properties.getPropertyAsBoolean(ADD_NO_CACHE_HEADERS, true);
+ redirectURL = properties.getPropertyAsString(REDIRECT_URL, null);
+ invalidateSessionOnDisconnect = properties.getPropertyAsBoolean(INVALIDATE_SESSION_ON_DISCONNECT, false);
+ loginAfterDisconnect = properties.getPropertyAsBoolean(ConfigurationConstants.LOGIN_AFTER_DISCONNECT_ELEMENT, false);
+ sessionRewritingEnabled = properties.getPropertyAsBoolean(SESSION_REWRITING_ENABLED, true);
+ initializeHttpResponseHeaders(properties);
+ validateEndpointProtocol();
+ }
+
+ /**
+ * Starts the <code>Endpoint</code> by creating a filter chain and setting
+ * up serializers and deserializers.
+ */
+ @Override public void start()
+ {
+ if (isStarted())
+ return;
+
+ super.start();
+
+ filterChain = createFilterChain();
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Variables
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Controller used to manage this endpoint.
+ */
+ protected EndpointControl controller;
+
+ /**
+ * AMF processing filter chain used by this endpoint.
+ */
+ protected AMFFilter filterChain;
+
+ /**
+ * Headers to add to the HTTP response.
+ */
+ protected List<HttpHeader> httpResponseHeaders;
+
+ //--------------------------------------------------------------------------
+ //
+ // Properties
+ //
+ //--------------------------------------------------------------------------
+
+ //----------------------------------
+ // addNoCacheHeaders
+ //----------------------------------
+
+ protected boolean addNoCacheHeaders = true;
+
+ /**
+ * Retrieves the <code>add-no-cache-headers</code> property.
+ *
+ * @return <code>true</code> if <code>add-no-cache-headers</code> is enabled;
+ * <code>false</code> otherwise.
+ */
+ public boolean isAddNoCacheHeaders()
+ {
+ return addNoCacheHeaders;
+ }
+
+ /**
+ * Sets the <code>add-no-cache-headers</code> property.
+ *
+ * @param addNoCacheHeaders The <code>add-no-cache-headers</code> property.
+ */
+ public void setAddNoCacheHeaders(boolean addNoCacheHeaders)
+ {
+ this.addNoCacheHeaders = addNoCacheHeaders;
+ }
+
+ //----------------------------------
+ // loginAfterDisconnect
+ //----------------------------------
+
+ /**
+ *
+ * This is a property used on the client.
+ */
+ protected boolean loginAfterDisconnect;
+
+ //----------------------------------
+ // invalidateSessionOnDisconnect
+ //----------------------------------
+
+ protected boolean invalidateSessionOnDisconnect;
+
+ /**
+ * Indicates whether the server session will be invalidated
+ * when a client channel disconnects.
+ * The default is <code>false</code>.
+ *
+ * @return <code>true</code> if the server session will be invalidated
+ * when a client channel disconnects, <code>false</code> otherwise.
+ */
+ public boolean isInvalidateSessionOnDisconnect()
+ {
+ return invalidateSessionOnDisconnect;
+ }
+
+ /**
+ * Determines whether to invalidate the server session for a client
+ * that disconnects its channel.
+ * The default is <code>false</code>.
+ *
+ * @param value <code>true</code> to invalidate the server session for a client
+ * that disconnects its channel, <code>false</code> otherwise.
+ */
+ public void setInvalidateSessionOnDisconnect(boolean value)
+ {
+ invalidateSessionOnDisconnect = value;
+ }
+
+ //----------------------------------
+ // redirectURL
+ //----------------------------------
+
+ protected String redirectURL;
+
+ /**
+ * Retrieves the <code>redirect-url</code> property.
+ *
+ * @return The <code>redirect-url</code> property.
+ */
+ public String getRedirectURL()
+ {
+ return redirectURL;
+ }
+
+ /**
+ * Sets the <code>redirect-url</code> property.
+ *
+ * @param redirectURL The <code>redirect-url</code> property.
+ */
+ public void setRedirectURL(String redirectURL)
+ {
+ this.redirectURL = redirectURL;
+ }
+
+ //----------------------------------
+ // sessionRewritingEnabled
+ //----------------------------------
+
+ protected boolean sessionRewritingEnabled = true;
+
+ /**
+ * Indicates whether the server will fall back on rewriting URLs to include
+ * session identifiers in the URL when HTTP session cookies are not allowed
+ * on the client. The default is <code>true</code>.
+ *
+ * @return <code>true</code> if the session rewriting is enabled.
+ */
+ public boolean isSessionRewritingEnabled()
+ {
+ return sessionRewritingEnabled;
+ }
+
+ /**
+ * Sets whether the session rewriting is enabled.
+ *
+ * @param value The session writing enabled value.
+ */
+ public void setSessionRewritingEnabled(boolean value)
+ {
+ sessionRewritingEnabled = value;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Public Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Handle AMF/AMFX encoded messages sent over HTTP.
+ *
+ * @param req The original servlet request.
+ * @param res The active servlet response.
+ */
+ @Override
+ public void service(HttpServletRequest req, HttpServletResponse res)
+ {
+ super.service(req, res);
+
+ try
+ {
+ // Setup serialization and type marshalling contexts
+ setThreadLocals();
+
+ // Create a context for this request
+ ActionContext context = new ActionContext();
+
+ // Pass endpoint's mpi settings to the context so that it knows what level of
+ // performance metrics should be gathered during serialization/deserialization
+ context.setRecordMessageSizes(isRecordMessageSizes());
+ context.setRecordMessageTimes(isRecordMessageTimes());
+
+ // Send invocation through filter chain, which ends at the MessageBroker
+ filterChain.invoke(context);
+
+ // After serialization completes, increment endpoint byte counters,
+ // if the endpoint is managed
+ if (isManaged())
+ {
+ controller.addToBytesDeserialized(context.getDeserializedBytes());
+ controller.addToBytesSerialized(context.getSerializedBytes());
+ }
+
+ if (context.getStatus() != MessageIOConstants.STATUS_NOTAMF)
+ {
+ if (addNoCacheHeaders)
+ addNoCacheHeaders(req, res);
+
+ addHeadersToResponse(req, res);
+
+ ByteArrayOutputStream outBuffer = context.getResponseOutput();
+
+ res.setContentType(getResponseContentType());
+
+ res.setContentLength(outBuffer.size());
+ outBuffer.writeTo(res.getOutputStream());
+ res.flushBuffer();
+ }
+ else
+ {
+ // Not an AMF request, probably viewed in a browser
+ if (redirectURL != null)
+ {
+ try
+ {
+ //Check for redirect URL context-root token
+ redirectURL = SettingsReplaceUtil.replaceContextPath(redirectURL, req.getContextPath());
+ res.sendRedirect(redirectURL);
+ }
+ catch (IllegalStateException alreadyFlushed)
+ {
+ // ignore
+ }
+ }
+ }
+ }
+ catch (IOException ioe)
+ {
+ // This happens when client closes the connection, log it at info level
+ log.info(ioe.getMessage());
+ // Store exception information for latter logging
+ req.setAttribute(HTTPRequestLog.HTTP_ERROR_INFO, ioe.toString());
+ }
+ catch (Throwable t)
+ {
+ log.error(t.getMessage(), t);
+ // Store exception information for latter logging
+ req.setAttribute(HTTPRequestLog.HTTP_ERROR_INFO, t.toString());
+ }
+ finally
+ {
+ clearThreadLocals();
+ }
+ }
+
+
+ /**
+ *
+ * Returns a <code>ConfigMap</code> of endpoint properties that the client
+ * needs. This includes properties from <code>super.describeEndpoint</code>
+ * and additional <code>BaseHTTPEndpoint</code> specific properties under
+ * "properties" key.
+ */
+ @Override
+ public ConfigMap describeEndpoint()
+ {
+ ConfigMap endpointConfig = super.describeEndpoint();
+
+ if (loginAfterDisconnect)
+ {
+ ConfigMap loginAfterDisconnect = new ConfigMap();
+ // Adding as a value rather than attribute to the parent
+ loginAfterDisconnect.addProperty(EMPTY_STRING, TRUE_STRING);
+
+ ConfigMap properties = endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
+ if (properties == null)
+ {
+ properties = new ConfigMap();
+ endpointConfig.addProperty(PROPERTIES_ELEMENT, properties);
+ }
+ properties.addProperty(ConfigurationConstants.LOGIN_AFTER_DISCONNECT_ELEMENT, loginAfterDisconnect);
+ }
+
+ return endpointConfig;
+ }
+
+ /**
+ * Overrides to guard against duplicate HTTP-based sessions for the same FlexClient
+ * which will occur if the remote host has disabled session cookies.
+ *
+ * @see AbstractEndpoint#setupFlexClient(String)
+ */
+ @Override
+ public FlexClient setupFlexClient(String id)
+ {
+ FlexClient flexClient = super.setupFlexClient(id);
+
+ // Scan for duplicate HTTP-sessions and if found, invalidate them and throw a MessageException.
+ // A request attribute is used to deal with batched AMF messages that arrive in a single request by trigger multiple passes through this method.
+ boolean duplicateSessionDetected = (FlexContext.getHttpRequest().getAttribute(REQUEST_ATTR_DUPLICATE_SESSION_FLAG) != null);
+
+ List<FlexSession> sessions = null;
+ if (!duplicateSessionDetected)
+ {
+ sessions = flexClient.getFlexSessions();
+ int n = sessions.size();
+ if (n > 1)
+ {
+ List<HttpFlexSession> httpFlexSessions = new ArrayList<HttpFlexSession>();
+ for (int i = 0; i < n; i++)
+ {
+ FlexSession currentSession = sessions.get(i);
+ if (currentSession instanceof HttpFlexSession)
+ httpFlexSessions.add((HttpFlexSession)currentSession);
+ if (httpFlexSessions.size() > 1)
+ {
+ FlexContext.getHttpRequest().setAttribute(REQUEST_ATTR_DUPLICATE_SESSION_FLAG, httpFlexSessions);
+ duplicateSessionDetected = true;
+ break;
+ }
+ }
+ }
+ }
+
+ // If more than one was found, remote host isn't using session cookies. Kill all duplicate sessions and return an error.
+ // Simplest to just re-scan the list given that it will be very short, but use an iterator for concurrent modification.
+ if (duplicateSessionDetected)
+ {
+ Object attributeValue = FlexContext.getHttpRequest().getAttribute(REQUEST_ATTR_DUPLICATE_SESSION_FLAG);
+ String newSessionId = null;
+ String oldSessionId = null;
+ if (attributeValue != null)
+ {
+ @SuppressWarnings("unchecked")
+ List<HttpFlexSession> httpFlexSessions = (List<HttpFlexSession>)attributeValue;
+ oldSessionId = httpFlexSessions.get(0).getId();
+ newSessionId = httpFlexSessions.get(1).getId();
+ }
+
+ if (sessions != null)
+ {
+ for (FlexSession session : sessions)
+ {
+ if (session instanceof HttpFlexSession)
+ {
+ session.invalidate();
+ }
+ }
+ }
+
+ // Return an error to the client.
+
+ DuplicateSessionException e = new DuplicateSessionException();
+ // Duplicate HTTP-based FlexSession error: A request for FlexClient ''{0}'' arrived over a new FlexSession ''{1}'', but FlexClient is already associated with FlexSession ''{2}'', therefore it cannot be associated with the new session.
+ e.setMessage(ERR_MSG_DUPLICATE_SESSIONS_DETECTED, new Object[]{flexClient.getId(), newSessionId, oldSessionId});
+ throw e;
+ }
+
+ return flexClient;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Protected Methods
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Adds custom headers specified in the config to the HTTP response. The only
+ * exception is that access control headers (Access-Control-*) are sent only
+ * if there is an Origin header in the request.
+ *
+ * @param request The HTTP request.
+ * @param response The HTTP response.
+ */
+ protected void addHeadersToResponse(HttpServletRequest request, HttpServletResponse response)
+ {
+ if (httpResponseHeaders == null || httpResponseHeaders.isEmpty())
+ return;
+
+ String origin = request.getHeader(HEADER_NAME_ORIGIN);
+ boolean originHeaderExists = origin != null && origin.length() != 0;
+
+ for (HttpHeader header : httpResponseHeaders)
+ {
+ if (header.name.startsWith(ACCESS_CONTROL) && !originHeaderExists)
+ continue;
+
+ response.addHeader(header.name, header.value);
+ }
+ }
+
+ /**
+ * Create the gateway filters that transform action requests
+ * and responses.
+ */
+ protected abstract AMFFilter createFilterChain();
+
+ /**
+ * Returns the content type used by the connection handler to set on the
+ * HTTP response. Subclasses should either return MessageIOConstants.AMF_CONTENT_TYPE
+ * or MessageIOConstants.XML_CONTENT_TYPE.
+ */
+ protected abstract String getResponseContentType();
+
+ /**
+ * Returns https which is the secure protocol scheme for the endpoint.
+ *
+ * @return https.
+ */
+ @Override protected String getSecureProtocolScheme()
+ {
+ return HTTPS_PROTOCOL_SCHEME;
+ }
+
+ /**
+ * Returns http which is the insecure protocol scheme for the endpoint.
+ *
+ * @return http.
+ */
+ @Override protected String getInsecureProtocolScheme()
+ {
+ return HTTP_PROTOCOL_SCHEME;
+ }
+
+ /**
+ * @see flex.messaging.endpoints.AbstractEndpoint#handleChannelDisconnect(CommandMessage)
+ */
+ @Override protected Message handleChannelDisconnect(CommandMessage disconnectCommand)
+ {
+ HttpFlexSession session = (HttpFlexSession)FlexContext.getFlexSession();
+ FlexClient flexClient = FlexContext.getFlexClient();
+
+ // Shut down any subscriptions established over this channel/endpoint
+ // for this specific FlexClient.
+ if (flexClient.isValid())
+ {
+ String endpointId = getId();
+ List<MessageClient> messageClients = flexClient.getMessageClients();
+ for (MessageClient messageClient : messageClients)
+ {
+ if (messageClient.getEndpointId().equals(endpointId))
+ {
+ messageClient.setClientChannelDisconnected(true);
+ messageClient.invalidate();
+ }
+ }
+ }
+
+ // And optionally invalidate the session.
+ if (session.isValid() && isInvalidateSessionOnDisconnect())
+ session.invalidate(false /* don't recreate */);
+
+ return super.handleChannelDisconnect(disconnectCommand);
+ }
+
+ protected void initializeHttpResponseHeaders(ConfigMap properties)
+ {
+ if (!properties.containsKey(HTTP_RESPONSE_HEADERS))
+ return;
+
+ ConfigMap httpResponseHeaders = properties.getPropertyAsMap(HTTP_RESPONSE_HEADERS, null);
+ if (httpResponseHeaders == null)
+ return;
+
+ @SuppressWarnings("unchecked")
+ List<String> headers = httpResponseHeaders.getPropertyAsList(HEADER_ATTR, null);
+ if (headers == null || headers.isEmpty())
+ return;
+
+ if (this.httpResponseHeaders == null)
+ this.httpResponseHeaders = new ArrayList<HttpHeader>();
+
+ for (String header : headers)
+ {
+ int colonIndex = header.indexOf(":");
+ String name = header.substring(0, colonIndex).trim();
+ String value = header.substring(colonIndex + 1).trim();
+ this.httpResponseHeaders.add(new HttpHeader(name, value));
+ }
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // Nested Classes
+ //
+ //--------------------------------------------------------------------------
+
+ /**
+ * Helper class used for headers in the HTTP request/response.
+ */
+ static class HttpHeader
+ {
+ public HttpHeader(String name, String value)
+ {
+ this.name = name;
+ this.value = value;
+ }
+ public final String name;
+ public final String value;
+ }
+}
\ No newline at end of file