You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by cd...@apache.org on 2015/12/20 14:13:42 UTC

[02/51] [partial] flex-blazeds git commit: Removed legacy directories and made the content of the modules directory the new root - Please use the maven build for now as the Ant build will no longer work untill it is adjusted to the new directory structur

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/endpoints/AbstractEndpoint.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/endpoints/AbstractEndpoint.java b/core/src/flex/messaging/endpoints/AbstractEndpoint.java
new file mode 100644
index 0000000..c265ea9
--- /dev/null
+++ b/core/src/flex/messaging/endpoints/AbstractEndpoint.java
@@ -0,0 +1,1513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import flex.management.ManageableComponent;
+import flex.management.runtime.messaging.MessageBrokerControl;
+import flex.management.runtime.messaging.endpoints.EndpointControl;
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.MessageBroker;
+import flex.messaging.MessageException;
+import flex.messaging.Server;
+import flex.messaging.client.FlexClient;
+import flex.messaging.client.FlexClientOutboundQueueProcessor;
+import flex.messaging.client.FlushResult;
+import flex.messaging.client.PollFlushResult;
+import flex.messaging.client.UserAgentSettings;
+import flex.messaging.config.ChannelSettings;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.config.ConfigurationConstants;
+import flex.messaging.config.ConfigurationException;
+import flex.messaging.config.SecurityConstraint;
+import flex.messaging.io.ClassAliasRegistry;
+import flex.messaging.io.SerializationContext;
+import flex.messaging.io.TypeMarshaller;
+import flex.messaging.io.TypeMarshallingContext;
+import flex.messaging.io.amf.translator.ASTranslator;
+import flex.messaging.log.Log;
+import flex.messaging.log.LogCategories;
+import flex.messaging.log.Logger;
+import flex.messaging.messages.AcknowledgeMessage;
+import flex.messaging.messages.AcknowledgeMessageExt;
+import flex.messaging.messages.AsyncMessage;
+import flex.messaging.messages.AsyncMessageExt;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.CommandMessageExt;
+import flex.messaging.messages.Message;
+import flex.messaging.messages.SmallMessage;
+import flex.messaging.security.SecurityException;
+import flex.messaging.util.ClassUtil;
+import flex.messaging.util.StringUtils;
+import flex.messaging.util.UserAgentManager;
+import flex.messaging.validators.DeserializationValidator;
+
+/**
+ * This is the default implementation of Endpoint, which provides a convenient
+ * base for behavior and associations common to all endpoints.
+ *
+ * These properties that appear in the endpoint configuration are only used by the
+ * client, therefore they have to be set on the appropriate client classes: connect-timeout-seconds set on Channel.
+ *
+ * @see flex.messaging.endpoints.Endpoint
+ */
+public abstract class AbstractEndpoint extends ManageableComponent
+        implements Endpoint2, ConfigurationConstants
+{
+    /** Log category for <code>AbstractEndpoint</code>. */
+    public static final String LOG_CATEGORY = LogCategories.ENDPOINT_GENERAL;
+
+    /**
+     * HTTP header field names.
+     */
+    public static final String HEADER_NAME_CACHE_CONTROL = "Cache-Control";
+    public static final String HEADER_NAME_EXPIRES = "Expires";
+    public static final String HEADER_NAME_PRAGMA = "Pragma";
+
+    // Errors
+    private static final int NONSECURE_PROTOCOL = 10066;
+    private static final int REQUIRES_FLEXCLIENT_SUPPORT = 10030;
+    private static final int ERR_MSG_INVALID_URL_SCHEME = 11100;
+
+    // XML Configuration Properties
+    private static final String SERIALIZATION = "serialization";
+    private static final String CREATE_ASOBJECT_FOR_MISSING_TYPE = "create-asobject-for-missing-type";
+    private static final String CUSTOM_DESERIALIZER = "custom-deserializer";
+    private static final String CUSTOM_SERIALIZER = "custom-serializer";
+    private static final String ENABLE_SMALL_MESSAGES = "enable-small-messages";
+    private static final String TYPE_MARSHALLER = "type-marshaller";
+    private static final String RESTORE_REFERENCES = "restore-references";
+    private static final String INSTANTIATE_TYPES = "instantiate-types";
+    private static final String SUPPORT_REMOTE_CLASS = "support-remote-class";
+    private static final String LEGACY_COLLECTION = "legacy-collection";
+    private static final String LEGACY_DICTIONARY = "legacy-dictionary";
+    private static final String LEGACY_MAP = "legacy-map";
+    private static final String LEGACY_XML = "legacy-xml";
+    private static final String LEGACY_XML_NAMESPACES = "legacy-xml-namespaces";
+    private static final String LEGACY_THROWABLE = "legacy-throwable";
+    private static final String LEGACY_BIG_NUMBERS = "legacy-big-numbers";
+    private static final String LEGACY_EXTERNALIZABLE = "legacy-externalizable";
+    private static final String ALLOW_XML_DOCTYPE_DECLARATION = "allow-xml-doctype-declaration";
+    private static final String ALLOW_XML_EXTERNAL_ENTITY_EXPANSION = "allow-xml-external-entity-expansion";
+
+    private static final String LOG_PROPERTY_ERRORS = "log-property-errors";
+    private static final String IGNORE_PROPERTY_ERRORS = "ignore-property-errors";
+    private static final String INCLUDE_READ_ONLY = "include-read-only";
+    private static final String GLOBAL_INCLUDE_READ_ONLY = "global-include-read-only";
+    private static final String FLEX_CLIENT_OUTBOUND_QUEUE_PROCESSOR = "flex-client-outbound-queue-processor";
+    private static final String SHOW_STACKTRACES = "show-stacktraces";
+    private static final String MAX_OBJECT_NEST_LEVEL = "max-object-nest-level";
+    private static final String MAX_COLLECTION_NEST_LEVEL = "max-collection-nest-level";
+    private static final String PREFER_VECTORS = "prefer-vectors";
+
+    // Endpoint properties
+    protected Set<String> clientLoadBalancingUrls;
+    protected String clientType;
+    protected int connectTimeoutSeconds;
+    protected int requestTimeoutSeconds;
+    protected FlexClientOutboundQueueProcessor flexClientOutboundQueueProcessor;
+    protected SerializationContext serializationContext;
+    protected Class<?> deserializerClass;
+    protected Class<?> serializerClass;
+    protected TypeMarshaller typeMarshaller;
+    protected int port;
+    private SecurityConstraint securityConstraint;
+    protected String url;
+    protected boolean recordMessageSizes;
+    protected boolean recordMessageTimes;
+    protected boolean remote;
+    protected Server server;
+    protected boolean serverOnly;
+
+    // Endpoint internal
+    protected String parsedUrl;
+    // Keeps track of what context path parsedUrl has been parsed for. If it is
+    // null, means parsedUrl has not been parsed already.
+    protected String parsedForContext;
+    protected boolean clientContextParsed;
+    protected String parsedClientUrl;
+    protected Logger log;
+
+    protected Class<?> flexClientOutboundQueueProcessClass;
+    protected ConfigMap flexClientOutboundQueueProcessorConfig;
+
+    // Supported messaging version
+    protected double messagingVersion = 1.0;
+
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs an unmanaged <code>AbstractEndpoint</code>.
+     */
+    public AbstractEndpoint()
+    {
+        this(false);
+    }
+
+    /**
+     * Constructs an <code>AbstractEndpoint</code> with the indicated management.
+     *
+     * @param enableManagement <code>true</code> if the <code>AbstractEndpoint</code>
+     * is manageable; <code>false</code> otherwise.
+     */
+    public AbstractEndpoint(boolean enableManagement)
+    {
+        super(enableManagement);
+        this.log = Log.getLogger(getLogCategory());
+        serializationContext = new SerializationContext();
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Initialize, validate, start, and stop methods.
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Initializes the <code>Endpoint</code> with the properties.
+     * If subclasses override this method, they must call <code>super.initialize()</code>.
+     *
+     * @param id The ID of the <code>Endpoint</code>.
+     * @param properties Properties for the <code>Endpoint</code>.
+     */
+    @Override
+    public void initialize(String id, ConfigMap properties)
+    {
+        super.initialize(id, properties);
+
+        if (properties == null || properties.size() == 0)
+            return;
+
+        // Client-targeted <client-load-balancing>
+        initializeClientLoadBalancing(id, properties);
+
+        // Client-targeted <connect-timeout-seconds/>
+        connectTimeoutSeconds = properties.getPropertyAsInt(CONNECT_TIMEOUT_SECONDS_ELEMENT, 0);
+
+        // Client-targeted <request-timeout-seconds/>
+        requestTimeoutSeconds = properties.getPropertyAsInt(REQUEST_TIMEOUT_SECONDS_ELEMENT, 0);
+
+        // Check for a custom FlexClient outbound queue processor.
+        ConfigMap outboundQueueConfig = properties.getPropertyAsMap(FLEX_CLIENT_OUTBOUND_QUEUE_PROCESSOR, null);
+        if (outboundQueueConfig != null)
+        {
+            // Get nested props for the processor.
+            flexClientOutboundQueueProcessorConfig = outboundQueueConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
+
+            String pClassName = outboundQueueConfig.getPropertyAsString(CLASS_ATTR, null);
+            if (pClassName != null)
+            {
+                try
+                {
+                    flexClientOutboundQueueProcessClass = createClass(pClassName);
+                    // And now create an instance and initialize to make sure the properties are valid.
+                    setFlexClientOutboundQueueProcessorConfig(flexClientOutboundQueueProcessorConfig);
+                }
+                catch (Throwable t)
+                {
+                    if (Log.isWarn())
+                        log.warn("Cannot register custom FlexClient outbound queue processor class {0}", new Object[]{pClassName}, t);
+                }
+            }
+        }
+
+        ConfigMap serialization = properties.getPropertyAsMap(SERIALIZATION, null);
+        if (serialization != null)
+        {
+            // Custom deserializers
+            List<?> deserializers = serialization.getPropertyAsList(CUSTOM_DESERIALIZER, null);
+            if (deserializers != null && Log.isWarn())
+                log.warn("Endpoint <custom-deserializer> functionality is no longer available. Please remove this entry from your configuration.");
+
+            // Custom serializers
+            List<?> serializers = serialization.getPropertyAsList(CUSTOM_SERIALIZER, null);
+            if (serializers != null && Log.isWarn())
+                log.warn("Endpoint <custom-serializer> functionality is no longer available. Please remove this entry from your configuration.");
+
+            // Type Marshaller implementation
+            String typeMarshallerClassName = serialization.getPropertyAsString(TYPE_MARSHALLER, null);
+            if (typeMarshallerClassName != null && typeMarshallerClassName.length() > 0)
+            {
+                try
+                {
+                    Class<?> tmc = createClass(typeMarshallerClassName);
+                    typeMarshaller = (TypeMarshaller)ClassUtil.createDefaultInstance(tmc, TypeMarshaller.class);
+                }
+                catch (Throwable t)
+                {
+                    if (Log.isWarn())
+                        log.warn("Cannot register custom type marshaller for type {0}", new Object[]{typeMarshallerClassName}, t);
+                }
+            }
+
+            // Boolean Serialization Flags
+            serializationContext.createASObjectForMissingType = serialization.getPropertyAsBoolean(CREATE_ASOBJECT_FOR_MISSING_TYPE, false);
+            serializationContext.enableSmallMessages = serialization.getPropertyAsBoolean(ENABLE_SMALL_MESSAGES, true);
+            serializationContext.instantiateTypes = serialization.getPropertyAsBoolean(INSTANTIATE_TYPES, true);
+            serializationContext.supportRemoteClass = serialization.getPropertyAsBoolean(SUPPORT_REMOTE_CLASS, false);
+            serializationContext.legacyCollection = serialization.getPropertyAsBoolean(LEGACY_COLLECTION, false);
+            serializationContext.legacyDictionary = serialization.getPropertyAsBoolean(LEGACY_DICTIONARY, false);
+            serializationContext.legacyMap = serialization.getPropertyAsBoolean(LEGACY_MAP, false);
+            serializationContext.legacyXMLDocument = serialization.getPropertyAsBoolean(LEGACY_XML, false);
+            serializationContext.legacyXMLNamespaces = serialization.getPropertyAsBoolean(LEGACY_XML_NAMESPACES, false);
+            serializationContext.legacyThrowable = serialization.getPropertyAsBoolean(LEGACY_THROWABLE, false);
+            serializationContext.legacyBigNumbers = serialization.getPropertyAsBoolean(LEGACY_BIG_NUMBERS, false);
+            serializationContext.legacyExternalizable = serialization.getPropertyAsBoolean(LEGACY_EXTERNALIZABLE, false);
+            serializationContext.allowXmlDoctypeDeclaration = serialization.getPropertyAsBoolean(ALLOW_XML_DOCTYPE_DECLARATION, false);
+            serializationContext.allowXmlExternalEntityExpansion = serialization.getPropertyAsBoolean(ALLOW_XML_EXTERNAL_ENTITY_EXPANSION, false);
+            serializationContext.maxObjectNestLevel = (int)serialization.getPropertyAsLong(MAX_OBJECT_NEST_LEVEL, 512);
+            serializationContext.maxCollectionNestLevel = (int)serialization.getPropertyAsLong(MAX_COLLECTION_NEST_LEVEL, 15);
+            serializationContext.preferVectors = serialization.getPropertyAsBoolean(PREFER_VECTORS, false);
+
+            boolean showStacktraces = serialization.getPropertyAsBoolean(SHOW_STACKTRACES, false);
+            if (showStacktraces && Log.isWarn())
+                log.warn("The " + SHOW_STACKTRACES + " configuration option is deprecated and non-functional. Please remove this from your configuration file.");
+            serializationContext.restoreReferences = serialization.getPropertyAsBoolean(RESTORE_REFERENCES, false);
+            serializationContext.logPropertyErrors = serialization.getPropertyAsBoolean(LOG_PROPERTY_ERRORS, false);
+            serializationContext.ignorePropertyErrors = serialization.getPropertyAsBoolean(IGNORE_PROPERTY_ERRORS, true);
+            serializationContext.includeReadOnly = serialization.getPropertyAsBoolean(INCLUDE_READ_ONLY, false);
+        }
+
+        recordMessageSizes = properties.getPropertyAsBoolean(ConfigurationConstants.RECORD_MESSAGE_SIZES_ELEMENT, false);
+
+        if (recordMessageSizes && Log.isWarn())
+            log.warn("Setting <record-message-sizes> to true affects application performance and should only be used for debugging");
+
+        recordMessageTimes = properties.getPropertyAsBoolean(ConfigurationConstants.RECORD_MESSAGE_TIMES_ELEMENT, false);
+    }
+
+    /**
+     * Starts the endpoint if its associated <code>MessageBroker</code> is started,
+     * and if the endpoint is not already running. If subclasses override this method,
+     * they must call <code>super.start()</code>.
+     */
+    @Override
+    public void start()
+    {
+        if (isStarted())
+            return;
+
+        // Check if the MessageBroker is started
+        MessageBroker broker = getMessageBroker();
+        if (!broker.isStarted())
+        {
+            if (Log.isWarn())
+            {
+                Log.getLogger(getLogCategory()).warn("Endpoint with id '{0}' cannot be started" +
+                        " when the MessageBroker is not started.",
+                        new Object[]{getId()});
+            }
+            return;
+        }
+
+        // Set up management
+        if (isManaged() && broker.isManaged())
+        {
+            setupEndpointControl(broker);
+            MessageBrokerControl controller = (MessageBrokerControl)broker.getControl();
+            if (getControl() != null)
+                controller.addEndpoint(this);
+        }
+
+        // Setup Deserializer and Serializer for the SerializationContext
+        if (deserializerClass == null)
+            deserializerClass = createClass(getDeserializerClassName());
+
+        if (serializerClass == null)
+            serializerClass = createClass(getSerializerClassName());
+
+        serializationContext.setDeserializerClass(deserializerClass);
+        serializationContext.setSerializerClass(serializerClass);
+
+        // Setup endpoint features
+        ClassAliasRegistry registry = ClassAliasRegistry.getRegistry();
+        registry.registerAlias(AsyncMessageExt.CLASS_ALIAS, AsyncMessageExt.class.getName());
+        registry.registerAlias(AcknowledgeMessageExt.CLASS_ALIAS, AcknowledgeMessageExt.class.getName());
+        registry.registerAlias(CommandMessageExt.CLASS_ALIAS, CommandMessageExt.class.getName());
+        super.start();
+    }
+
+    /**
+     * Stops the endpoint if it is running. If subclasses override this method, they must
+     * call <code>super.stop()</code>.
+     */
+    @Override
+    public void stop()
+    {
+        if (!isStarted())
+            return;
+
+        super.stop();
+
+        // Remove management
+        if (isManaged() && getMessageBroker().isManaged())
+        {
+            if (getControl() != null)
+            {
+                getControl().unregister();
+                setControl(null);
+            }
+            setManaged(false);
+        }
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Getters and Setters for AbstractEndpoint properties
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Adds a client-load-balancing URL.
+     *
+     * @param url A client-load-balancing URL.
+     * @return <code>false</code> if the set already contains the URL, <code>true</code> otherwise.
+     *
+     */
+    public boolean addClientLoadBalancingUrl(String url)
+    {
+        if (clientLoadBalancingUrls == null)
+            clientLoadBalancingUrls = new HashSet<String>();
+
+        if (url == null || url.length() == 0)
+        {
+            // Invalid {0} configuration for endpoint ''{1}''; cannot add empty url.
+            ConfigurationException  ce = new ConfigurationException();
+            ce.setMessage(ERR_MSG_EMTPY_CLIENT_LOAD_BALACNING_URL, new Object[]{CLIENT_LOAD_BALANCING_ELEMENT, getId()});
+            throw ce;
+        }
+
+        return clientLoadBalancingUrls.add(url);
+    }
+
+    /**
+     * Removes the client-load-balancing URL.
+     *
+     * @param url The URL to remove.
+     * @return <code>true</code> if the set contained the URL, <code>false</code> otherwise.
+     */
+    public boolean removeClientLoadBalancingUrl(String url)
+    {
+        if (clientLoadBalancingUrls != null)
+            return clientLoadBalancingUrls.remove(url);
+        return false;
+    }
+
+    /**
+     * Retrieves a snapshot of the current client-load-balancing URLs.
+     *
+     * @return A snapshot of the current client-load-balancing URLs, or <code>null</code> if
+     * no URL exists.
+     */
+    public Set<String> getClientLoadBalancingUrls()
+    {
+        return clientLoadBalancingUrls == null? null : new HashSet<String>(clientLoadBalancingUrls);
+    }
+
+    /**
+     * Retrieves the corresponding client channel type for the endpoint.
+     *
+     * @return The corresponding client channel type for the endpoint.
+     */
+    public String getClientType()
+    {
+        return clientType;
+    }
+
+    /**
+     * Sets the corresponding client channel type for the endpoint.
+     *
+     * @param type The corresponding client channel type for the endpoint.
+     */
+    public void setClientType(String type)
+    {
+        this.clientType = type;
+    }
+
+    /**
+     * Retrieves the <code>FlexClientOutboundQueueProcessorClass</code> of the endpoint.
+     *
+     * @return The <code>FlexClientOutboundQueueProcessorClass</code> of the endpoint.
+     */
+    public Class<?> getFlexClientOutboundQueueProcessorClass()
+    {
+        return flexClientOutboundQueueProcessClass;
+    }
+
+    /**
+     * Sets the the <code>FlexClientOutboundQueueProcessor</code> of the endpoint.
+     *
+     * @param flexClientOutboundQueueProcessorClass the Class of the Flex client outbound queue processor.
+     */
+    public void setFlexClientOutboundQueueProcessorClass(Class<?> flexClientOutboundQueueProcessorClass)
+    {
+        this.flexClientOutboundQueueProcessClass = flexClientOutboundQueueProcessorClass;
+        if (flexClientOutboundQueueProcessClass != null && flexClientOutboundQueueProcessorConfig != null)
+        {
+            FlexClientOutboundQueueProcessor processor = (FlexClientOutboundQueueProcessor)ClassUtil.createDefaultInstance(flexClientOutboundQueueProcessClass, null);
+            processor.initialize(flexClientOutboundQueueProcessorConfig);
+        }
+    }
+
+    /**
+     * Retrieves the properties for the <code>FlexClientOutboundQueueProcessor</code> of the endpoint.
+     *
+     * @return The properties for the <code>FlexClientOutboundQueueProcessor</code> of the endpoint.
+     */
+    public ConfigMap getFlexClientOutboundQueueProcessorConfig()
+    {
+        return flexClientOutboundQueueProcessorConfig;
+    }
+
+    /**
+     * Sets the properties for the <code>FlexClientOutboundQueueProcessor</code> of the endpoint.
+     *
+     * @param flexClientOutboundQueueProcessorConfig The configuration map.
+     */
+    public void setFlexClientOutboundQueueProcessorConfig(ConfigMap flexClientOutboundQueueProcessorConfig)
+    {
+        this.flexClientOutboundQueueProcessorConfig = flexClientOutboundQueueProcessorConfig;
+        if (flexClientOutboundQueueProcessorConfig != null && flexClientOutboundQueueProcessClass != null)
+        {
+            FlexClientOutboundQueueProcessor processor = (FlexClientOutboundQueueProcessor)ClassUtil.createDefaultInstance(flexClientOutboundQueueProcessClass, null);
+            processor.initialize(flexClientOutboundQueueProcessorConfig);
+        }
+    }
+
+    /**
+     * Sets the ID of the <code>AbstractEndpoint</code>. If the <code>AbstractEndpoint</code>
+     * has a <code>MessageBroker</code> assigned, it also updates the ID in the
+     * <code>MessageBroker</code>.
+     *
+     * @param id The endpoint ID.
+     */
+    @Override
+    public void setId(String id)
+    {
+        String oldId = getId();
+
+        if (oldId != null && oldId.equals(id))
+            return;
+
+        super.setId(id);
+
+        // Update the endpoint id in the broker
+        MessageBroker broker = getMessageBroker();
+        if (broker != null)
+        {
+            // broker must have the endpoint then
+            broker.removeEndpoint(oldId);
+            broker.addEndpoint(this);
+        }
+    }
+
+    /**
+     * Retrieves the <code>MessageBroker</code> of the <code>AbstractEndpoint</code>.
+     *
+     * @return The <code>MessageBroker</code> of the <code>AbstractEndpoint</code>.
+     */
+    public MessageBroker getMessageBroker()
+    {
+        return (MessageBroker)getParent();
+    }
+
+    /**
+     * Sets the <code>MessageBroker</code> of the <code>AbstractEndpoint</code>.
+     * Removes the <code>AbstractEndpoint</code> from the old broker
+     * (if there was one) and adds to the list of endpoints in the new broker.
+     *
+     * @param broker The <code>MessageBroker</code> of the <code>AbstractEndpoint</code>.
+     */
+    public void setMessageBroker(MessageBroker broker)
+    {
+        MessageBroker oldBroker = getMessageBroker();
+
+        setParent(broker);
+
+        if (oldBroker != null)
+            oldBroker.removeEndpoint(getId());
+
+        // Add endpoint to the new broker if needed
+        if (broker.getEndpoint(getId()) != this)
+            broker.addEndpoint(this);
+    }
+
+    /**
+     * Return the highest messaging version currently available via this
+     * endpoint.
+     * @return double the messaging version
+     */
+    public double getMessagingVersion()
+    {
+        return messagingVersion;
+    }
+
+    /**
+     * Retrieves the port of the URL of the endpoint.
+     * A return value of 0 denotes no port in the channel URL.
+     *
+     * @return The port of the URL of the endpoint, or 0 if the URL does not contain
+     * a port number.
+     */
+    public int getPort()
+    {
+        return port;
+    }
+
+    /**
+     * Determines whether the endpoint is secure.
+     *
+     * @return <code>false</code> by default.
+     */
+    public boolean isSecure()
+    {
+        return false;
+    }
+
+    /**
+     * Determines if the endpoint clients connect to directly is mirrored and running
+     * on a remote host, in which case this local instance is not started and will service no direct
+     * client connections.
+     *
+     * @return <code>true</code> if this endpoint will not process direct client connections and is just
+     * a local representation of a symmetric endpoint on a remote host that will, <code>false</code> otherwise.
+     */
+    public boolean isRemote()
+    {
+        return remote;
+    }
+
+    /**
+     * Sets the remote status for this endpoint.
+     *
+     * @param value <code>true</code> if this endpoint will not process direct client connections and is just
+     * a local representation of a symmetric endpoint on a remote host that will, <code>false</code> otherwise.
+     */
+    public void setRemote(boolean value)
+    {
+        remote = value;
+    }
+
+    /**
+     * Retrieves the <tt>Server</tt> that the endpoint is using, or <code>null</code> if
+     * no server has been assigned.
+     * @return Server The Server object the endpoint is using.
+     */
+    public Server getServer()
+    {
+        return server;
+    }
+
+    /**
+     * Sets the <tt>Server</tt> that the endpoint will use.
+     * @param server The Server object.
+     */
+    public void setServer(Server server)
+    {
+        this.server = server;
+    }
+
+    /**
+     * Determines whether the endpoint is server only.
+     *
+     * @return <code>true</code> if the endpoint is server only, <code>false</code> otherwise.
+     */
+    public boolean getServerOnly()
+    {
+        return serverOnly;
+    }
+
+    /**
+     * Sets whether the endpoint is server only.
+     *
+     * @param serverOnly <code>true</code> if the endpoint is server only, <code>false</code> otherwise.
+     */
+    public void setServerOnly(boolean serverOnly)
+    {
+        this.serverOnly = serverOnly;
+    }
+
+    /**
+     * Retrieves the <code>SecurityConstraint</code> of the <code>Endpoint</code>.
+     *
+     * @return The <code>SecurityConstraint</code> of the <code>Endpoint</code>.
+     */
+    public SecurityConstraint getSecurityConstraint()
+    {
+        return securityConstraint;
+    }
+
+    /**
+     * Sets the <code>SecurityConstraint</code> of the <code>Endpoint</code>.
+     *
+     * @param securityConstraint The SecurityContraint object.
+     */
+    public void setSecurityConstraint(SecurityConstraint securityConstraint)
+    {
+        this.securityConstraint = securityConstraint;
+    }
+
+    /**
+     * Retrieves the <code>SerializationContext</code> of the endpoint.
+     *
+     * @return The <code>SerializationContext</code> of the endpoint.
+     */
+    public SerializationContext getSerializationContext()
+    {
+        return serializationContext;
+    }
+
+    /**
+     * Sets the <code>SerializationContext</code> of the endpoint.
+     *
+     * @param serializationContext The SerializationContext object.
+     */
+    public void setSerializationContext(SerializationContext serializationContext)
+    {
+        this.serializationContext = serializationContext;
+    }
+
+    /**
+     * Retrieves the <code>TypeMarshaller</code> of the endpoint.
+     *
+     * @return The <code>TypeMarshaller</code> of the endpoint.
+     */
+    public TypeMarshaller getTypeMarshaller()
+    {
+        if (typeMarshaller == null)
+            typeMarshaller = new ASTranslator();
+
+        return typeMarshaller;
+    }
+
+    /**
+     * Sets the <code>TypeMarshaller</code> of the endpoint.
+     *
+     * @param typeMarshaller The TypeMarshaller object.
+     */
+    public void setTypeMarshaller(TypeMarshaller typeMarshaller)
+    {
+        this.typeMarshaller = typeMarshaller;
+    }
+
+    /**
+     * Retrieves the URL of the endpoint.
+     *
+     * @return The URL of the endpoint.
+     */
+    public String getUrl()
+    {
+        return url;
+    }
+
+    /**
+     * Sets the URL of the endpoint.
+     *
+     * @param url The URL of the endpoint.
+     */
+    public void setUrl(String url)
+    {
+        this.url = url;
+        port = internalParsePort(url);
+        parsedForContext = null;
+        clientContextParsed = false;
+    }
+
+    /**
+     *
+     * Returns the url of the endpoint parsed for the client.
+     *
+     * @return The url of the endpoint parsed for the client.
+     */
+    public String getUrlForClient()
+    {
+        if (!clientContextParsed)
+        {
+            HttpServletRequest req = FlexContext.getHttpRequest();
+            if (req != null)
+            {
+                String contextPath = req.getContextPath();
+                parseClientUrl(contextPath);
+            }
+            else
+            {
+                return url;
+            }
+        }
+        return parsedClientUrl;
+    }
+
+    /**
+     *
+     * Returns the total throughput for the endpoint.
+     *
+     * @return The total throughput for the endpoint.
+     */
+    public long getThroughput()
+    {
+        EndpointControl control = (EndpointControl)getControl();
+
+        return control.getBytesDeserialized().longValue() + control.getBytesSerialized().longValue();
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Other Public APIs
+    //
+    //--------------------------------------------------------------------------
+
+
+    public static void addNoCacheHeaders(HttpServletRequest req, HttpServletResponse res)
+    {
+        String userAgent = req.getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
+
+        // For MSIE over HTTPS, set additional Cache-Control values.
+        if (req.isSecure() && userAgent != null && userAgent.indexOf(UserAgentSettings.USER_AGENT_MSIE) != -1)
+            res.addHeader(HEADER_NAME_CACHE_CONTROL, "no-store, no-cache, must-revalidate, post-check=0, pre-check=0, no-transform, private");
+        else // For the rest, set no-cache header value only.
+            res.addHeader(HEADER_NAME_CACHE_CONTROL, "no-cache");
+
+        // Set an expiration date in the past as well.
+        res.setDateHeader(HEADER_NAME_EXPIRES, 946080000000L); //Approx Jan 1, 2000
+
+        // Set Pragma no-cache header if we're not MSIE over HTTPS
+        if (!(req.isSecure() && userAgent != null && userAgent.indexOf(UserAgentSettings.USER_AGENT_MSIE) != -1))
+            res.setHeader(HEADER_NAME_PRAGMA, "no-cache");
+    }
+
+    /**
+     *
+     */
+    public Message convertToSmallMessage(Message message)
+    {
+        if (message instanceof SmallMessage)
+        {
+            Message smallMessage = ((SmallMessage)message).getSmallMessage();
+            if (smallMessage != null)
+                message = smallMessage;
+        }
+
+        return message;
+    }
+
+    /**
+     * Retrieves a <code>ConfigMap</code> of the endpoint properties the client
+     * needs. Subclasses should add additional properties to <code>super.describeDestination</code>,
+     * or return <code>null</code> if they must not send their properties to the client.
+     *
+     * @return ConfigMap The ConfigMap object.
+     */
+    public ConfigMap describeEndpoint()
+    {
+        ConfigMap channelConfig = new ConfigMap();
+
+        if (serverOnly) // Client does not need server only endpoints.
+            return channelConfig;
+
+        channelConfig.addProperty(ID_ATTR, getId());
+        channelConfig.addProperty(TYPE_ATTR, getClientType());
+
+        ConfigMap properties = new ConfigMap();
+
+        boolean containsClientLoadBalancing = clientLoadBalancingUrls != null && !clientLoadBalancingUrls.isEmpty();
+        if (containsClientLoadBalancing)
+        {
+            ConfigMap clientLoadBalancing = new ConfigMap();
+            for (Iterator<String> iterator = clientLoadBalancingUrls.iterator(); iterator.hasNext();)
+            {
+                ConfigMap url = new ConfigMap();
+                // Adding as a value rather than attribute to the parent.
+                url.addProperty(EMPTY_STRING, iterator.next());
+                clientLoadBalancing.addProperty(URL_ATTR, url);
+            }
+            properties.addProperty(CLIENT_LOAD_BALANCING_ELEMENT, clientLoadBalancing);
+        }
+
+        // Add endpoint uri only if no client-load-balancing urls are defined.
+        if (!containsClientLoadBalancing)
+        {
+            ConfigMap endpointConfig = new ConfigMap();
+            endpointConfig.addProperty(URI_ATTR, getUrlForClient());
+            channelConfig.addProperty(ENDPOINT_ELEMENT, endpointConfig);
+        }
+
+        if (connectTimeoutSeconds > 0)
+        {
+            ConfigMap connectTimeoutConfig = new ConfigMap();
+            connectTimeoutConfig.addProperty(EMPTY_STRING, String.valueOf(connectTimeoutSeconds));
+            properties.addProperty(CONNECT_TIMEOUT_SECONDS_ELEMENT, connectTimeoutConfig);
+        }
+
+        if (requestTimeoutSeconds > 0)
+        {
+            ConfigMap requestTimeoutSeconds = new ConfigMap();
+            requestTimeoutSeconds.addProperty(EMPTY_STRING, String.valueOf(requestTimeoutSeconds));
+            properties.addProperty(REQUEST_TIMEOUT_SECONDS_ELEMENT, requestTimeoutSeconds);
+        }
+
+        if (recordMessageTimes)
+        {
+            ConfigMap recordMessageTimesMap = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            recordMessageTimesMap.addProperty(EMPTY_STRING, TRUE_STRING);
+            properties.addProperty(RECORD_MESSAGE_TIMES_ELEMENT, recordMessageTimesMap);
+        }
+
+        if (recordMessageSizes)
+        {
+            ConfigMap recordMessageSizesMap = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            recordMessageSizesMap.addProperty(EMPTY_STRING, TRUE_STRING);
+            properties.addProperty(RECORD_MESSAGE_SIZES_ELEMENT, recordMessageSizesMap);
+        }
+
+        ConfigMap serialization = new ConfigMap();
+        serialization.addProperty(ENABLE_SMALL_MESSAGES_ELEMENT, Boolean.toString(serializationContext.enableSmallMessages));
+        properties.addProperty(SERIALIZATION_ELEMENT, serialization);
+
+        if (properties.size() > 0)
+            channelConfig.addProperty(PROPERTIES_ELEMENT, properties);
+
+        return channelConfig;
+    }
+
+    /**
+     *
+     * Make sure this matches with ChannelSettings.getParsedUri.
+     */
+    public String getParsedUrl(String contextPath)
+    {
+        parseUrl(contextPath);
+        return parsedUrl;
+    }
+
+    /**
+     *
+     */
+    public void handleClientMessagingVersion(Number version)
+    {
+        if (version != null)
+        {
+            boolean clientSupportsSmallMessages = version.doubleValue() >= messagingVersion;
+            if (clientSupportsSmallMessages && getSerializationContext().enableSmallMessages)
+            {
+                FlexSession session = FlexContext.getFlexSession();
+                if (session != null)
+                    session.setUseSmallMessages(true);
+            }
+        }
+    }
+
+    /**
+     * Default implementation of the Endpoint <code>service</code> method.
+     * Subclasses should call <code>super.service</code> before their custom
+     * code.
+     *
+     * @param req The HttpServletRequest object.
+     * @param res The HttpServletResponse object.
+     */
+    public void service(HttpServletRequest req, HttpServletResponse res)
+    {
+        validateRequestProtocol(req);
+    }
+
+    /**
+     * Typically invoked by subclasses, this method transforms decoded message data
+     * into the appropriate Message object and routes the Message to the endpoint's broker.
+     *
+     * @param message The decoded message data.
+     * @return Message The transformed message.
+     */
+    public Message serviceMessage(Message message)
+    {
+        if (isManaged())
+        {
+            ((EndpointControl) getControl()).incrementServiceMessageCount();
+        }
+
+        try
+        {
+            FlexContext.setThreadLocalEndpoint(this);
+            Message ack = null;
+
+            // Make sure this message is timestamped.
+            if (message.getTimestamp() == 0)
+            {
+                message.setTimestamp(System.currentTimeMillis());
+            }
+
+            // Reset the endpoint header for inbound messages to the id for this endpoint
+            // to guarantee that it's correct. Don't allow clients to spoof this.
+            // However, if the endpoint id is passed as null we need to tag the message to
+            // skip channel/endpoint validation at the destination level (MessageBroker.inspectChannel()).
+            if (message.getHeader(Message.ENDPOINT_HEADER) != null)
+                message.setHeader(Message.VALIDATE_ENDPOINT_HEADER, Boolean.TRUE);
+            message.setHeader(Message.ENDPOINT_HEADER, getId());
+
+            if (message instanceof CommandMessage)
+            {
+                CommandMessage command = (CommandMessage)message;
+
+                // Apply channel endpoint level constraint; always allow login commands through.
+                int operation = command.getOperation();
+                if (operation != CommandMessage.LOGIN_OPERATION)
+                    checkSecurityConstraint(message);
+
+                // Handle general (not Consumer specific) poll requests here.
+                // We need to fetch all outbound messages for client subscriptions over this endpoint.
+                // We identify these general poll messages by their operation and a null clientId.
+                if (operation == CommandMessage.POLL_OPERATION && message.getClientId() == null)
+                {
+                    verifyFlexClientSupport(command);
+
+
+                    FlexClient flexClient = FlexContext.getFlexClient();
+                    ack = handleFlexClientPollCommand(flexClient, command);
+                }
+                else if (operation == CommandMessage.DISCONNECT_OPERATION)
+                {
+                    ack = handleChannelDisconnect(command);
+                }
+                else if (operation == CommandMessage.TRIGGER_CONNECT_OPERATION)
+                {
+                    ack = new AcknowledgeMessage();
+                    ((AcknowledgeMessage)ack).setCorrelationId(message.getMessageId());
+
+                    boolean needsConfig = false;
+                    if (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER) != null)
+                        needsConfig = ((Boolean)(command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER)));
+
+                    // Send configuration information only if the client requested.
+                    if (needsConfig)
+                    {
+                        ConfigMap serverConfig = getMessageBroker().describeServices(this);
+                        if (serverConfig.size() > 0)
+                            ack.setBody(serverConfig);
+                    }
+                }
+                else
+                {
+                    // Block a subset of commands for legacy clients that need to be recompiled to
+                    // interop with a 2.5+ server.
+                    if (operation == CommandMessage.SUBSCRIBE_OPERATION || operation == CommandMessage.POLL_OPERATION)
+                        verifyFlexClientSupport(command);
+
+                    ack = getMessageBroker().routeCommandToService((CommandMessage) message, this);
+
+                    // Look for client advertised features on initial connect.
+                    if (operation == CommandMessage.CLIENT_PING_OPERATION || operation == CommandMessage.LOGIN_OPERATION)
+                    {
+                        Number clientVersion = (Number)command.getHeader(CommandMessage.MESSAGING_VERSION);
+                        handleClientMessagingVersion(clientVersion);
+
+                        // Also respond by advertising the messaging version on the
+                        // acknowledgement.
+                        ack.setHeader(CommandMessage.MESSAGING_VERSION, new Double(messagingVersion));
+                    }
+                }
+            }
+            else
+            {
+                // Block any AsyncMessages from a legacy client.
+                if (message instanceof AsyncMessage)
+                    verifyFlexClientSupport(message);
+
+                // Apply channel endpoint level constraint.
+                checkSecurityConstraint(message);
+
+                ack = getMessageBroker().routeMessageToService(message, this);
+            }
+
+            return ack;
+        }
+        finally
+        {
+            FlexContext.setThreadLocalEndpoint(null);
+        }
+    }
+
+   /**
+    * Utility method that endpoint implementations (or associated classes)
+    * should invoke when they receive an incoming message from a client but before
+    * servicing it. This method looks up or creates the proper FlexClient instance
+    * based upon the client the message came from and places it in the FlexContext.
+    *
+    * @param message The incoming message to process.
+    *
+    * @return The FlexClient, or <code>null</code> if the message did not contain a FlexClient ID value.
+    */
+   public FlexClient setupFlexClient(Message message)
+   {
+       FlexClient flexClient = null;
+       if (message.getHeaders().containsKey(Message.FLEX_CLIENT_ID_HEADER))
+       {
+           String id = (String)message.getHeaders().get(Message.FLEX_CLIENT_ID_HEADER);
+           // If the id is null, reset to the special token value that let's us differentiate
+           // between legacy clients and 2.5+ clients.
+           if (id == null)
+               id = FlexClient.NULL_FLEXCLIENT_ID;
+           flexClient = setupFlexClient(id);
+       }
+       return flexClient;
+   }
+
+   /**
+    * Utility method that endpoint implementations (or associated classes)
+    * should invoke when they receive an incoming message from a client but before
+    * servicing it. This method looks up or creates the proper FlexClient instance
+    * based upon the FlexClient ID value received from the client.
+    * It also associates this FlexClient instance with the current FlexSession.
+    *
+    * @param id The FlexClient ID value from the client.
+    *
+    * @return The FlexClient or null if the provided ID was <code>null</code>.
+    */
+   public FlexClient setupFlexClient(String id)
+   {
+       FlexClient flexClient = null;
+       if (id != null)
+       {
+           // This indicates that we're dealing with a non-legacy client that hasn't been
+           // assigned a FlexClient Id yet. Reset to null to generate a fresh Id.
+           if (id.equals(FlexClient.NULL_FLEXCLIENT_ID))
+               id = null;
+
+           flexClient = getMessageBroker().getFlexClientManager().getFlexClient(id);
+           // Make sure the FlexClient and FlexSession are associated.
+           FlexSession session = FlexContext.getFlexSession();
+           flexClient.registerFlexSession(session);
+           // And place the FlexClient in FlexContext for this request.
+           FlexContext.setThreadLocalFlexClient(flexClient);
+       }
+       return flexClient;
+   }
+
+   /**
+    *
+    * Performance metrics gathering property
+    */
+    public boolean isRecordMessageSizes()
+    {
+        return recordMessageSizes;
+    }
+
+   /**
+    *
+    * Performance metrics gathering property
+    */
+    public boolean isRecordMessageTimes()
+    {
+        return recordMessageTimes;
+    }
+
+    /**
+     *
+     */
+    public void setThreadLocals()
+    {
+        if (serializationContext != null)
+        {
+            SerializationContext context = (SerializationContext)serializationContext.clone();
+            // Get the latest deserialization validator from the broker.
+            MessageBroker broker = getMessageBroker();
+            DeserializationValidator validator = broker == null? null : broker.getDeserializationValidator();
+            context.setDeserializationValidator(validator);
+            SerializationContext.setSerializationContext(context);
+        }
+
+        TypeMarshallingContext.setTypeMarshaller(getTypeMarshaller());
+    }
+
+    /**
+     *
+     */
+    public void clearThreadLocals()
+    {
+        SerializationContext.clearThreadLocalObjects();
+        TypeMarshallingContext.clearThreadLocalObjects();
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Protected/private methods.
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Returns the log category of the <code>AbstractEndpoint</code>. Subclasses
+     * can override to provide a more specific logging category.
+     *
+     * @return The log category.
+     */
+    @Override
+    protected String getLogCategory()
+    {
+        return LOG_CATEGORY;
+    }
+
+    /**
+     * Hook method invoked when a disconnect command is received from a client channel.
+     * The response returned by this method is not guaranteed to get to the client, which
+     * is free to terminate its physical connection at any point.
+     *
+     * @param disconnectCommand The disconnect command.
+     * @return The response; by default an empty <tt>AcknowledgeMessage</tt>.
+     */
+    protected Message handleChannelDisconnect(CommandMessage disconnectCommand)
+    {
+        return new AcknowledgeMessage();
+    }
+
+    /**
+     * Hook method for varying poll reply strategies for synchronous endpoints.
+     * The default behavior performs a non-waited, synchronous poll for the FlexClient
+     * and if any messages are currently queued they are returned immediately. If no
+     * messages are queued an empty response is returned immediately.
+     *
+     * @param flexClient The FlexClient that issued the poll request.
+     * @param pollCommand The poll command from the client.
+     * @return The FlushResult response.
+     */
+    protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand)
+    {
+        return flexClient.poll(getId());
+    }
+
+    /**
+     * Handles a general poll request from a FlexClient to this endpoint.
+     * Subclasses may override to implement different poll handling strategies.
+     *
+     * @param flexClient The FlexClient that issued the poll request.
+     * @param pollCommand The poll command from the client.
+     * @return The poll response message; either for success or fault.
+     */
+    protected Message handleFlexClientPollCommand(FlexClient flexClient, CommandMessage pollCommand)
+    {
+        if (Log.isDebug())
+            Log.getLogger(getMessageBroker().getLogCategory(pollCommand)).debug(
+                 "Before handling general client poll request. " + StringUtils.NEWLINE +
+                 "  incomingMessage: " + pollCommand + StringUtils.NEWLINE);
+
+        FlushResult flushResult = handleFlexClientPoll(flexClient, pollCommand);
+        Message pollResponse = null;
+
+        // Generate a no-op poll response if necessary; prevents a single client from busy polling when the server
+        // is doing wait()-based long-polls.
+        if ((flushResult instanceof PollFlushResult) && ((PollFlushResult)flushResult).isClientProcessingSuppressed())
+        {
+            pollResponse = new CommandMessage(CommandMessage.CLIENT_SYNC_OPERATION);
+            pollResponse.setHeader(CommandMessage.NO_OP_POLL_HEADER, Boolean.TRUE);
+        }
+
+        if (pollResponse == null)
+        {
+            List<Message> messagesToReturn = (flushResult != null) ? flushResult.getMessages() : null;
+            if (messagesToReturn != null && !messagesToReturn.isEmpty())
+            {
+                pollResponse = new CommandMessage(CommandMessage.CLIENT_SYNC_OPERATION);
+                pollResponse.setBody(messagesToReturn.toArray());
+            }
+            else
+            {
+                pollResponse = new AcknowledgeMessage();
+            }
+        }
+
+        // Set the adaptive poll wait time if necessary.
+        if (flushResult != null)
+        {
+            int nextFlushWaitTime = flushResult.getNextFlushWaitTimeMillis();
+            if (nextFlushWaitTime > 0)
+                pollResponse.setHeader(CommandMessage.POLL_WAIT_HEADER, new Integer(nextFlushWaitTime));
+        }
+
+        if (Log.isDebug())
+        {
+            String debugPollResult = Log.getPrettyPrinter().prettify(pollResponse);
+            Log.getLogger(getMessageBroker().getLogCategory(pollCommand)).debug(
+                 "After handling general client poll request. " + StringUtils.NEWLINE +
+                 "  reply: " + debugPollResult + StringUtils.NEWLINE);
+        }
+
+        return pollResponse;
+    }
+
+    /**
+     * Initializes the <code>Endpoint</code> with the client-load-balancing urls.
+     *
+     * @param id Id of the <code>Endpoint</code>.
+     * @param properties Properties for the <code>Endpoint</code>.
+     */
+
+    protected void initializeClientLoadBalancing(String id, ConfigMap properties)
+    {
+        if (!properties.containsKey(CLIENT_LOAD_BALANCING_ELEMENT))
+            return;
+
+        ConfigMap clientLoadBalancing = properties.getPropertyAsMap(CLIENT_LOAD_BALANCING_ELEMENT, null);
+        if (clientLoadBalancing == null)
+        {
+            // Invalid {0} configuration for endpoint ''{1}''; no urls defined.
+            ConfigurationException ce = new ConfigurationException();
+            ce.setMessage(ERR_MSG_EMPTY_CLIENT_LOAD_BALANCING_ELEMENT, new Object[]{CLIENT_LOAD_BALANCING_ELEMENT, getId()});
+            throw ce;
+        }
+
+        @SuppressWarnings("unchecked")
+        List<String> urls = clientLoadBalancing.getPropertyAsList(URL_ATTR, null);
+        if (urls == null || urls.isEmpty())
+        {
+            // Invalid {0} configuration for endpoint ''{1}''; no urls defined.
+            ConfigurationException ce = new ConfigurationException();
+            ce.setMessage(ERR_MSG_EMPTY_CLIENT_LOAD_BALANCING_ELEMENT, new Object[]{CLIENT_LOAD_BALANCING_ELEMENT, getId()});
+            throw ce;
+        }
+
+        for (Iterator<String> iterator = urls.iterator(); iterator.hasNext();)
+        {
+            String url = iterator.next();
+            if (!addClientLoadBalancingUrl(url) && Log.isWarn())
+                log.warn("Endpoint '{0}' is ignoring the url '{1}' as it's already in the set of client-load-balancing urls.", new Object[]{id, url});
+        }
+    }
+
+    protected void checkSecurityConstraint(Message message)
+    {
+        if (securityConstraint != null)
+        {
+            getMessageBroker().getLoginManager().checkConstraint(securityConstraint);
+        }
+    }
+
+    /**
+     * Returns the deserializer class name used by the endpoint.
+     *
+     * @return The deserializer class name used by the endpoint.
+     */
+    protected abstract String getDeserializerClassName();
+
+    /**
+     * Returns the serializer class name used by the endpoint.
+     *
+     * @return The serializer class name used by the endpoint.
+     */
+    protected abstract String getSerializerClassName();
+
+    /**
+     * Returns the secure protocol scheme for the endpoint.
+     *
+     * @return The secure protocol scheme for the endpoint.
+     */
+    protected abstract String getSecureProtocolScheme();
+
+    /**
+     * Returns the insecure protocol scheme for the endpoint.
+     *
+     * @return The insecure protocol scheme for the endpoint.
+     */
+    protected abstract String getInsecureProtocolScheme();
+
+    /**
+     * Invoked automatically to allow the <code>AbstractEndpoint</code> to setup
+     * its corresponding MBean control. Subclasses should override to setup and
+     * register their MBean control. Manageable subclasses should override this
+     * template method.
+     *
+     * @param broker The <code>MessageBroker</code> that manages this
+     * <code>AbstractEndpoint</code>.
+     */
+    protected abstract void setupEndpointControl(MessageBroker broker);
+
+    /**
+     * Validates the endpoint url scheme.
+     */
+    protected void validateEndpointProtocol()
+    {
+        String scheme = isSecure()? getSecureProtocolScheme() : getInsecureProtocolScheme();
+        if (!url.startsWith(scheme))
+        {
+            ConfigurationException ce = new ConfigurationException();
+            ce.setMessage(ERR_MSG_INVALID_URL_SCHEME, new Object[] {url, scheme});
+            throw ce;
+        }
+    }
+
+    protected void validateRequestProtocol(HttpServletRequest req)
+    {
+        // Secure url can talk to secure or non-secure endpoint.
+        // Non-secure url can only talk to non-secure endpoint.
+        boolean secure = req.isSecure();
+        if (!secure && isSecure())
+        {
+            // Secure endpoints must be contacted via a secure protocol.
+            String endpointPath = req.getServletPath() + req.getPathInfo();
+            SecurityException se = new SecurityException();
+            se.setMessage(NONSECURE_PROTOCOL, new Object[]{endpointPath});
+            throw se;
+        }
+    }
+
+    /**
+     *
+     * Verifies that the remote client supports the FlexClient API.
+     * Legacy clients that do not support this receive a message fault for any messages they send.
+     *
+     * @param message The message to verify.
+     */
+    protected void verifyFlexClientSupport(Message message)
+    {
+        if (FlexContext.getFlexClient() == null)
+        {
+            MessageException me = new MessageException();
+            me.setMessage(REQUIRES_FLEXCLIENT_SUPPORT, new Object[] {message.getDestination()});
+            throw me;
+        }
+    }
+
+    /**
+     *
+     */
+    protected Class<?> createClass(String className)
+    {
+        return ClassUtil.createClass(className, FlexContext.getMessageBroker() == null ? null :
+                    FlexContext.getMessageBroker().getClassLoader());
+    }
+
+    // This should match with ChannelSetting.parseClientUri
+    private void parseClientUrl(String contextPath)
+    {
+        if (!clientContextParsed)
+        {
+            String channelEndpoint = url.trim();
+
+            // either {context-root} or {context.root} is legal
+            channelEndpoint = StringUtils.substitute(channelEndpoint, "{context-root}", ConfigurationConstants.CONTEXT_PATH_TOKEN);
+
+            if ((contextPath == null) && (channelEndpoint.indexOf(ConfigurationConstants.CONTEXT_PATH_TOKEN) != -1))
+            {
+                // context root must be specified before it is used
+                ConfigurationException e = new ConfigurationException();
+                e.setMessage(ConfigurationConstants.UNDEFINED_CONTEXT_ROOT, new Object[]{getId()});
+                throw e;
+            }
+
+            // simplify the number of combinations to test by ensuring our
+            // context path always starts with a slash
+            if (contextPath != null && !contextPath.startsWith("/"))
+            {
+                contextPath = "/" + contextPath;
+            }
+
+            // avoid double-slashes from context root by replacing /{context.root}
+            // in a single replacement step
+            if (channelEndpoint.indexOf(ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN) != -1)
+            {
+                // but avoid double-slash for /{context.root}/etc when we have
+                // the default context root
+                if ("/".equals(contextPath) && !ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN.equals(channelEndpoint))
+                    contextPath = "";
+
+                channelEndpoint = StringUtils.substitute(channelEndpoint, ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN, contextPath);
+            }
+            // otherwise we have something like {server.name}:{server.port}{context.root}...
+            else
+            {
+                // but avoid double-slash for {context.root}/etc when we have
+                // the default context root
+                if ("/".equals(contextPath) && !ConfigurationConstants.CONTEXT_PATH_TOKEN.equals(channelEndpoint))
+                    contextPath = "";
+
+                channelEndpoint = StringUtils.substitute(channelEndpoint, ConfigurationConstants.CONTEXT_PATH_TOKEN, contextPath);
+            }
+
+            parsedClientUrl = channelEndpoint;
+            clientContextParsed = true;
+        }
+    }
+
+    private int internalParsePort(String url)
+    {
+        int port = ChannelSettings.parsePort(url);
+        // If there is no specified port, log an info message as urls without ports are supported
+        if (port == 0 && Log.isInfo())
+            log.info("No port specified in channel URL:  {0}", new Object[]{url});
+
+        return port == -1? 0 : port; // Replace -1 with 0.
+    }
+
+    private void parseUrl(String contextPath)
+    {
+        // Parse again only if never parsed before or parsed for a different contextPath.
+        if (parsedForContext == null || !parsedForContext.equals(contextPath))
+        {
+            String channelEndpoint = url.toLowerCase().trim();
+
+            // Remove protocol and host info
+            String insecureProtocol = getInsecureProtocolScheme() + "://";
+            String secureProtocol = getSecureProtocolScheme() + "://";
+            if (channelEndpoint.startsWith(secureProtocol) || channelEndpoint.startsWith(insecureProtocol))
+            {
+                int nextSlash = channelEndpoint.indexOf('/', 8);
+                if (nextSlash > 0)
+                {
+                    channelEndpoint = channelEndpoint.substring(nextSlash);
+                }
+            }
+
+            // either {context-root} or {context.root} is legal
+            channelEndpoint = StringUtils.substitute(channelEndpoint, "{context-root}", ConfigurationConstants.CONTEXT_PATH_TOKEN);
+
+            // Remove context path info
+            if (channelEndpoint.startsWith(ConfigurationConstants.CONTEXT_PATH_TOKEN))
+            {
+                channelEndpoint = channelEndpoint.substring(ConfigurationConstants.CONTEXT_PATH_TOKEN.length());
+            }
+            else if (channelEndpoint.startsWith(ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN))
+            {
+                channelEndpoint = channelEndpoint.substring(ConfigurationConstants.SLASH_CONTEXT_PATH_TOKEN.length());
+            }
+            else if (contextPath.length() > 0)
+            {
+                if (channelEndpoint.startsWith(contextPath.toLowerCase()))
+                {
+                    channelEndpoint = channelEndpoint.substring(contextPath.length());
+                }
+            }
+
+            // We also don't match on trailing slashes
+            if (channelEndpoint.endsWith("/"))
+            {
+                channelEndpoint = channelEndpoint.substring(0, channelEndpoint.length() - 1);
+            }
+
+            parsedUrl = channelEndpoint;
+            parsedForContext = contextPath;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java b/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java
new file mode 100644
index 0000000..bb31b39
--- /dev/null
+++ b/core/src/flex/messaging/endpoints/BaseHTTPEndpoint.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints;
+
+import flex.management.runtime.messaging.endpoints.EndpointControl;
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.HttpFlexSession;
+import flex.messaging.MessageClient;
+import flex.messaging.client.FlexClient;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.config.ConfigurationConstants;
+import flex.messaging.endpoints.amf.AMFFilter;
+import flex.messaging.io.MessageIOConstants;
+import flex.messaging.io.amf.ActionContext;
+import flex.messaging.log.HTTPRequestLog;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.Message;
+import flex.messaging.util.SettingsReplaceUtil;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract base class for all the HTTP-based endpoints.
+ */
+public abstract class BaseHTTPEndpoint extends AbstractEndpoint
+{
+    //--------------------------------------------------------------------------
+    //
+    // Public Static Constants
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * The secure and insecure URL schemes for the HTTP endpoint.
+     */
+    public static final String HTTP_PROTOCOL_SCHEME = "http";
+    public static final String HTTPS_PROTOCOL_SCHEME = "https";
+
+    //--------------------------------------------------------------------------
+    //
+    // Private Static Constants
+    //
+    //--------------------------------------------------------------------------
+
+    private static final String ADD_NO_CACHE_HEADERS = "add-no-cache-headers";
+    private static final String REDIRECT_URL = "redirect-url";
+    private static final String INVALIDATE_SESSION_ON_DISCONNECT = "invalidate-session-on-disconnect";
+    private static final String HTTP_RESPONSE_HEADERS = "http-response-headers";
+    private static final String HEADER_ATTR = "header";
+
+    private static final String HEADER_NAME_ORIGIN = "Origin";
+    private static final String ACCESS_CONTROL = "Access-Control-";
+    private static final String SESSION_REWRITING_ENABLED = "session-rewriting-enabled";
+
+    private static final int ERR_MSG_DUPLICATE_SESSIONS_DETECTED = 10035;
+    private static final String REQUEST_ATTR_DUPLICATE_SESSION_FLAG = "flex.messaging.request.DuplicateSessionDetected";
+
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs an unmanaged <code>BaseHTTPEndpoint</code>.
+     */
+    public BaseHTTPEndpoint()
+    {
+        this(false);
+    }
+
+    /**
+     * Constructs a <code>BaseHTTPEndpoint</code> with the specified management setting.
+     *
+     * @param enableManagement <code>true</code> if the <code>BaseHTTPEndpoint</code>
+     * is manageable; otherwise <code>false</code>.
+     */
+    public BaseHTTPEndpoint(boolean enableManagement)
+    {
+        super(enableManagement);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Initialize, validate, start, and stop methods.
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Initializes the <code>Endpoint</code> with the properties.
+     * If subclasses override this method, they must call <code>super.initialize()</code>.
+     *
+     * @param id The ID of the <code>Endpoint</code>.
+     * @param properties Properties for the <code>Endpoint</code>.
+     */
+    @Override public void initialize(String id, ConfigMap properties)
+    {
+        super.initialize(id, properties);
+
+        if (properties == null || properties.size() == 0)
+            return;
+
+        // General HTTP props.
+        addNoCacheHeaders = properties.getPropertyAsBoolean(ADD_NO_CACHE_HEADERS, true);
+        redirectURL = properties.getPropertyAsString(REDIRECT_URL, null);
+        invalidateSessionOnDisconnect = properties.getPropertyAsBoolean(INVALIDATE_SESSION_ON_DISCONNECT, false);
+        loginAfterDisconnect = properties.getPropertyAsBoolean(ConfigurationConstants.LOGIN_AFTER_DISCONNECT_ELEMENT, false);
+        sessionRewritingEnabled = properties.getPropertyAsBoolean(SESSION_REWRITING_ENABLED, true);
+        initializeHttpResponseHeaders(properties);
+        validateEndpointProtocol();
+    }
+
+    /**
+     * Starts the <code>Endpoint</code> by creating a filter chain and setting
+     * up serializers and deserializers.
+     */
+    @Override public void start()
+    {
+        if (isStarted())
+            return;
+
+        super.start();
+
+        filterChain = createFilterChain();
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Controller used to manage this endpoint.
+     */
+    protected EndpointControl controller;
+
+    /**
+     * AMF processing filter chain used by this endpoint.
+     */
+    protected AMFFilter filterChain;
+
+    /**
+     * Headers to add to the HTTP response.
+     */
+    protected List<HttpHeader> httpResponseHeaders;
+
+    //--------------------------------------------------------------------------
+    //
+    // Properties
+    //
+    //--------------------------------------------------------------------------
+
+    //----------------------------------
+    //  addNoCacheHeaders
+    //----------------------------------
+
+    protected boolean addNoCacheHeaders = true;
+
+    /**
+     * Retrieves the <code>add-no-cache-headers</code> property.
+     *
+     * @return <code>true</code> if <code>add-no-cache-headers</code> is enabled;
+     * <code>false</code> otherwise.
+     */
+    public boolean isAddNoCacheHeaders()
+    {
+        return addNoCacheHeaders;
+    }
+
+    /**
+     * Sets the <code>add-no-cache-headers</code> property.
+     *
+     * @param addNoCacheHeaders The <code>add-no-cache-headers</code> property.
+     */
+    public void setAddNoCacheHeaders(boolean addNoCacheHeaders)
+    {
+        this.addNoCacheHeaders = addNoCacheHeaders;
+    }
+
+    //----------------------------------
+    //  loginAfterDisconnect
+    //----------------------------------
+
+    /**
+     *
+     * This is a property used on the client.
+     */
+    protected boolean loginAfterDisconnect;
+
+    //----------------------------------
+    //  invalidateSessionOnDisconnect
+    //----------------------------------
+
+    protected boolean invalidateSessionOnDisconnect;
+
+    /**
+     * Indicates whether the server session will be invalidated
+     * when a client channel disconnects.
+     * The default is <code>false</code>.
+     *
+     * @return <code>true</code> if the server session will be invalidated
+     *         when a client channel disconnects, <code>false</code> otherwise.
+     */
+    public boolean isInvalidateSessionOnDisconnect()
+    {
+        return invalidateSessionOnDisconnect;
+    }
+
+    /**
+     * Determines whether to invalidate the server session for a client
+     * that disconnects its channel.
+     * The default is <code>false</code>.
+     *
+     * @param value <code>true</code> to invalidate the server session for a client
+     *              that disconnects its channel, <code>false</code> otherwise.
+     */
+    public void setInvalidateSessionOnDisconnect(boolean value)
+    {
+        invalidateSessionOnDisconnect = value;
+    }
+
+    //----------------------------------
+    //  redirectURL
+    //----------------------------------
+
+    protected String redirectURL;
+
+    /**
+     * Retrieves the <code>redirect-url</code> property.
+     *
+     * @return The <code>redirect-url</code> property.
+     */
+    public String getRedirectURL()
+    {
+        return redirectURL;
+    }
+
+    /**
+     * Sets the <code>redirect-url</code> property.
+     *
+     * @param redirectURL The <code>redirect-url</code> property.
+     */
+    public void setRedirectURL(String redirectURL)
+    {
+        this.redirectURL = redirectURL;
+    }
+
+    //----------------------------------
+    //  sessionRewritingEnabled
+    //----------------------------------
+
+    protected boolean sessionRewritingEnabled = true;
+
+    /**
+     * Indicates whether the server will fall back on rewriting URLs to include
+     * session identifiers in the URL when HTTP session cookies are not allowed
+     * on the client. The default is <code>true</code>.
+     *
+     * @return <code>true</code> if the session rewriting is enabled.
+     */
+    public boolean isSessionRewritingEnabled()
+    {
+        return sessionRewritingEnabled;
+    }
+
+    /**
+     * Sets whether the session rewriting is enabled.
+     *
+     * @param value The session writing enabled value.
+     */
+    public void setSessionRewritingEnabled(boolean value)
+    {
+        sessionRewritingEnabled = value;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Handle AMF/AMFX encoded messages sent over HTTP.
+     *
+     * @param req The original servlet request.
+     * @param res The active servlet response.
+     */
+    @Override
+    public void service(HttpServletRequest req, HttpServletResponse res)
+    {
+        super.service(req, res);
+
+        try
+        {
+            // Setup serialization and type marshalling contexts
+            setThreadLocals();
+
+            // Create a context for this request
+            ActionContext context = new ActionContext();
+
+            // Pass endpoint's mpi settings to the context so that it knows what level of
+            // performance metrics should be gathered during serialization/deserialization
+            context.setRecordMessageSizes(isRecordMessageSizes());
+            context.setRecordMessageTimes(isRecordMessageTimes());
+
+            // Send invocation through filter chain, which ends at the MessageBroker
+            filterChain.invoke(context);
+
+            // After serialization completes, increment endpoint byte counters,
+            // if the endpoint is managed
+            if (isManaged())
+            {
+                controller.addToBytesDeserialized(context.getDeserializedBytes());
+                controller.addToBytesSerialized(context.getSerializedBytes());
+            }
+
+            if (context.getStatus() != MessageIOConstants.STATUS_NOTAMF)
+            {
+                if (addNoCacheHeaders)
+                    addNoCacheHeaders(req, res);
+
+                addHeadersToResponse(req, res);
+
+                ByteArrayOutputStream outBuffer = context.getResponseOutput();
+
+                res.setContentType(getResponseContentType());
+
+                res.setContentLength(outBuffer.size());
+                outBuffer.writeTo(res.getOutputStream());
+                res.flushBuffer();
+            }
+            else
+            {
+                // Not an AMF request, probably viewed in a browser
+                if (redirectURL != null)
+                {
+                    try
+                    {
+                        //Check for redirect URL context-root token
+                        redirectURL = SettingsReplaceUtil.replaceContextPath(redirectURL, req.getContextPath());
+                        res.sendRedirect(redirectURL);
+                    }
+                    catch (IllegalStateException alreadyFlushed)
+                    {
+                        // ignore
+                    }
+                }
+            }
+        }
+        catch (IOException ioe)
+        {
+            // This happens when client closes the connection, log it at info level
+            log.info(ioe.getMessage());
+            // Store exception information for latter logging
+            req.setAttribute(HTTPRequestLog.HTTP_ERROR_INFO, ioe.toString());
+        }
+        catch (Throwable t)
+        {
+            log.error(t.getMessage(), t);
+            // Store exception information for latter logging
+            req.setAttribute(HTTPRequestLog.HTTP_ERROR_INFO, t.toString());
+        }
+        finally
+        {
+            clearThreadLocals();
+        }
+    }
+
+
+    /**
+     *
+     * Returns a <code>ConfigMap</code> of endpoint properties that the client
+     * needs. This includes properties from <code>super.describeEndpoint</code>
+     * and additional <code>BaseHTTPEndpoint</code> specific properties under
+     * "properties" key.
+     */
+    @Override
+    public ConfigMap describeEndpoint()
+    {
+        ConfigMap endpointConfig = super.describeEndpoint();
+
+        if (loginAfterDisconnect)
+        {
+            ConfigMap loginAfterDisconnect = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            loginAfterDisconnect.addProperty(EMPTY_STRING, TRUE_STRING);
+
+            ConfigMap properties = endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
+            if (properties == null)
+            {
+                properties = new ConfigMap();
+                endpointConfig.addProperty(PROPERTIES_ELEMENT, properties);
+            }
+            properties.addProperty(ConfigurationConstants.LOGIN_AFTER_DISCONNECT_ELEMENT, loginAfterDisconnect);
+        }
+
+        return endpointConfig;
+    }
+
+    /**
+     * Overrides to guard against duplicate HTTP-based sessions for the same FlexClient
+     * which will occur if the remote host has disabled session cookies.
+     *
+     * @see AbstractEndpoint#setupFlexClient(String)
+     */
+    @Override
+    public FlexClient setupFlexClient(String id)
+    {
+        FlexClient flexClient = super.setupFlexClient(id);
+
+        // Scan for duplicate HTTP-sessions and if found, invalidate them and throw a MessageException.
+        // A request attribute is used to deal with batched AMF messages that arrive in a single request by trigger multiple passes through this method.
+        boolean duplicateSessionDetected = (FlexContext.getHttpRequest().getAttribute(REQUEST_ATTR_DUPLICATE_SESSION_FLAG) != null);
+
+        List<FlexSession> sessions = null;
+        if (!duplicateSessionDetected)
+        {
+            sessions = flexClient.getFlexSessions();
+            int n = sessions.size();
+            if (n > 1)
+            {
+                List<HttpFlexSession> httpFlexSessions = new ArrayList<HttpFlexSession>();
+                for (int i = 0; i < n; i++)
+                {
+                    FlexSession currentSession = sessions.get(i);
+                    if (currentSession instanceof HttpFlexSession)
+                        httpFlexSessions.add((HttpFlexSession)currentSession);
+                    if (httpFlexSessions.size() > 1)
+                    {
+                        FlexContext.getHttpRequest().setAttribute(REQUEST_ATTR_DUPLICATE_SESSION_FLAG, httpFlexSessions);
+                        duplicateSessionDetected = true;
+                        break;
+                    }
+                }
+            }
+        }
+
+        // If more than one was found, remote host isn't using session cookies. Kill all duplicate sessions and return an error.
+        // Simplest to just re-scan the list given that it will be very short, but use an iterator for concurrent modification.
+        if (duplicateSessionDetected)
+        {
+            Object attributeValue = FlexContext.getHttpRequest().getAttribute(REQUEST_ATTR_DUPLICATE_SESSION_FLAG);
+            String newSessionId = null;
+            String oldSessionId = null;
+            if (attributeValue != null)
+            {
+                @SuppressWarnings("unchecked")
+                List<HttpFlexSession> httpFlexSessions = (List<HttpFlexSession>)attributeValue;
+                oldSessionId = httpFlexSessions.get(0).getId();
+                newSessionId = httpFlexSessions.get(1).getId();
+            }
+
+            if (sessions != null)
+            {
+                for (FlexSession session : sessions)
+                {
+                    if (session instanceof HttpFlexSession)
+                    {
+                        session.invalidate();
+                    }
+                }
+            }
+
+            // Return an error to the client.
+
+            DuplicateSessionException e = new DuplicateSessionException();
+            // Duplicate HTTP-based FlexSession error: A request for FlexClient ''{0}'' arrived over a new FlexSession ''{1}'', but FlexClient is already associated with FlexSession ''{2}'', therefore it cannot be associated with the new session.
+            e.setMessage(ERR_MSG_DUPLICATE_SESSIONS_DETECTED, new Object[]{flexClient.getId(), newSessionId, oldSessionId});
+            throw e;
+        }
+
+        return flexClient;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Protected Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Adds custom headers specified in the config to the HTTP response. The only
+     * exception is that access control headers (Access-Control-*) are sent only
+     * if there is an Origin header in the request.
+     *
+     * @param request The HTTP request.
+     * @param response The HTTP response.
+     */
+    protected void addHeadersToResponse(HttpServletRequest request, HttpServletResponse response)
+    {
+        if (httpResponseHeaders == null || httpResponseHeaders.isEmpty())
+            return;
+
+        String origin = request.getHeader(HEADER_NAME_ORIGIN);
+        boolean originHeaderExists = origin != null && origin.length() != 0;
+
+        for (HttpHeader header : httpResponseHeaders)
+        {
+            if (header.name.startsWith(ACCESS_CONTROL) && !originHeaderExists)
+                continue;
+
+            response.addHeader(header.name, header.value);
+        }
+    }
+
+    /**
+     * Create the gateway filters that transform action requests
+     * and responses.
+     */
+    protected abstract AMFFilter createFilterChain();
+
+    /**
+     * Returns the content type used by the connection handler to set on the
+     * HTTP response. Subclasses should either return MessageIOConstants.AMF_CONTENT_TYPE
+     * or MessageIOConstants.XML_CONTENT_TYPE.
+     */
+    protected abstract String getResponseContentType();
+
+    /**
+     * Returns https which is the secure protocol scheme for the endpoint.
+     *
+     * @return https.
+     */
+    @Override protected String getSecureProtocolScheme()
+    {
+        return HTTPS_PROTOCOL_SCHEME;
+    }
+
+    /**
+     * Returns http which is the insecure protocol scheme for the endpoint.
+     *
+     * @return http.
+     */
+    @Override protected String getInsecureProtocolScheme()
+    {
+        return HTTP_PROTOCOL_SCHEME;
+    }
+
+    /**
+     * @see flex.messaging.endpoints.AbstractEndpoint#handleChannelDisconnect(CommandMessage)
+     */
+    @Override protected Message handleChannelDisconnect(CommandMessage disconnectCommand)
+    {
+        HttpFlexSession session = (HttpFlexSession)FlexContext.getFlexSession();
+        FlexClient flexClient = FlexContext.getFlexClient();
+
+        // Shut down any subscriptions established over this channel/endpoint
+        // for this specific FlexClient.
+        if (flexClient.isValid())
+        {
+            String endpointId = getId();
+            List<MessageClient> messageClients = flexClient.getMessageClients();
+            for (MessageClient messageClient : messageClients)
+            {
+                if (messageClient.getEndpointId().equals(endpointId))
+                {
+                    messageClient.setClientChannelDisconnected(true);
+                    messageClient.invalidate();
+                }
+            }
+        }
+
+        // And optionally invalidate the session.
+        if (session.isValid() && isInvalidateSessionOnDisconnect())
+            session.invalidate(false /* don't recreate */);
+
+        return super.handleChannelDisconnect(disconnectCommand);
+    }
+
+    protected void initializeHttpResponseHeaders(ConfigMap properties)
+    {
+        if (!properties.containsKey(HTTP_RESPONSE_HEADERS))
+            return;
+
+        ConfigMap httpResponseHeaders = properties.getPropertyAsMap(HTTP_RESPONSE_HEADERS, null);
+        if (httpResponseHeaders == null)
+            return;
+
+        @SuppressWarnings("unchecked")
+        List<String> headers = httpResponseHeaders.getPropertyAsList(HEADER_ATTR, null);
+        if (headers == null || headers.isEmpty())
+            return;
+
+        if (this.httpResponseHeaders == null)
+            this.httpResponseHeaders = new ArrayList<HttpHeader>();
+
+        for (String header : headers)
+        {
+            int colonIndex = header.indexOf(":");
+            String name = header.substring(0, colonIndex).trim();
+            String value = header.substring(colonIndex + 1).trim();
+            this.httpResponseHeaders.add(new HttpHeader(name, value));
+        }
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Nested Classes
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Helper class used for headers in the HTTP request/response.
+     */
+    static class HttpHeader
+    {
+        public HttpHeader(String name, String value)
+        {
+            this.name = name;
+            this.value = value;
+        }
+        public final String name;
+        public final String value;
+    }
+}
\ No newline at end of file