You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by cd...@apache.org on 2014/05/05 22:08:22 UTC

[08/51] [partial] FLEX-34306 - [BlazeDS] Make the BlazeDS build run on Windows machines - Added some mkdir commands to the ANT Build.java - Did some fine-tuning to resolve some compile errors

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java b/modules/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java
old mode 100755
new mode 100644
index c1d5071..ece3144
--- a/modules/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java
+++ b/modules/core/src/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java
@@ -1,1226 +1,1226 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints;
-
-import flex.messaging.FlexContext;
-import flex.messaging.FlexSession;
-import flex.messaging.HttpFlexSession;
-import flex.messaging.MessageException;
-import flex.messaging.client.EndpointPushNotifier;
-import flex.messaging.client.FlexClient;
-import flex.messaging.client.FlushResult;
-import flex.messaging.client.UserAgentSettings;
-import flex.messaging.config.ConfigMap;
-import flex.messaging.log.Log;
-import flex.messaging.messages.AcknowledgeMessage;
-import flex.messaging.messages.AsyncMessage;
-import flex.messaging.messages.CommandMessage;
-import flex.messaging.messages.Message;
-import flex.messaging.messages.MessagePerformanceInfo;
-import flex.messaging.messages.MessagePerformanceUtils;
-import flex.messaging.util.TimeoutManager;
-import flex.messaging.util.UserAgentManager;
-
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * Base class for HTTP-based endpoints that support streaming HTTP connections to
- * connected clients.
- * Each streaming connection managed by this endpoint consumes one of the request
- * handler threads provided by the servlet container, so it is not highly scalable
- * but offers performance advantages over client polling for clients receiving
- * a steady, rapid stream of pushed messages.
- * This endpoint does not support polling clients and will fault any poll requests
- * that are received. To support polling clients use subclasses of
- * BaseHTTPEndpoint instead.
- */
-public abstract class BaseStreamingHTTPEndpoint extends BaseHTTPEndpoint
-{
-    //--------------------------------------------------------------------------
-    //
-    // Private Static Constants
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * This token is used in chunked HTTP responses frequently so initialize it statically for general use.
-     */
-    private static final byte[] CRLF_BYTES = {(byte)13, (byte)10};
-
-    /**
-     * This token is used for the terminal chunk within a chunked response.
-     */
-    private static final byte ZERO_BYTE = (byte)48;
-
-    /**
-     * This token is used to signal that a chunk of data should be skipped by the client.
-     */
-    private static final byte NULL_BYTE = (byte)0;
-
-    /**
-     * Parameter name for 'command' passed in a request for a new streaming connection.
-     */
-    private static final String COMMAND_PARAM_NAME = "command";
-
-    /**
-     * This is the token at the end of the HTTP request line that indicates that it is
-     * a stream connection that should be held open to push data back to the client,
-     * as opposed to a regular request-response message.
-     */
-    private static final String OPEN_COMMAND = "open";
-
-    /**
-     * This is the token at the end of the HTTP request line that indicates that it is
-     * a stream connection that should be closed.
-     */
-    private static final String CLOSE_COMMAND = "close";
-
-    /**
-     *  Parameter name for the stream ID; passed with commands for an existing streaming connection.
-     */
-    private static final String STREAM_ID_PARAM_NAME = "streamId";
-
-    /**
-     * Constant for HTTP/1.0.
-     */
-    private static final String HTTP_1_0 = "HTTP/1.0";
-
-    /**
-     * Thread name suffix for request threads that are servicing a pinned open streaming connection.
-     */
-    private static final String STREAMING_THREAD_NAME_EXTENSION = "-in-streaming-mode";
-
-    /**
-     * Configuration constants.
-     */
-    private static final String PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES = "connection-idle-timeout-minutes";
-    private static final String PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES = "idle-timeout-minutes";
-    private static final String MAX_STREAMING_CLIENTS = "max-streaming-clients";
-    private static final String SERVER_TO_CLIENT_HEARTBEAT_MILLIS = "server-to-client-heartbeat-millis";
-    private static final String PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = "invalidate-messageclient-on-streaming-close";
-
-    /**
-     * Defaults.
-     */
-    private static final boolean DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = false;
-    private static final int DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS = 5000;
-    private static final int DEFAULT_MAX_STREAMING_CLIENTS = 10;
-
-    /**
-     * Errors.
-     */
-    public static final String POLL_NOT_SUPPORTED_CODE = "Server.PollNotSupported";
-    public static final int POLL_NOT_SUPPORTED_MESSAGE = 10034;
-
-
-    //--------------------------------------------------------------------------
-    //
-    // Constructor
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Constructs an unmanaged <code>BaseStreamingHTTPEndpoint</code>.
-     */
-    public BaseStreamingHTTPEndpoint()
-    {
-        this(false);
-    }
-
-    /**
-     * Constructs an <code>BaseStreamingHTTPEndpoint</code> with the indicated management.
-     *
-     * @param enableManagement <code>true</code> if the <code>BaseStreamingHTTPEndpoint</code>
-     * is manageable; <code>false</code> otherwise.
-     */
-    public BaseStreamingHTTPEndpoint(boolean enableManagement)
-    {
-        super(enableManagement);
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Initialize, validate, start, and stop methods.
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Initializes the <code>Endpoint</code> with the properties.
-     * If subclasses override this method, they must call <code>super.initialize()</code>.
-     *
-     * @param id The ID of the <code>Endpoint</code>.
-     * @param properties Properties for the <code>Endpoint</code>.
-     */
-    @Override
-    public void initialize(String id, ConfigMap properties)
-    {
-        super.initialize(id, properties);
-
-        if (properties == null || properties.size() == 0)
-        {
-            // Initialize default user agent manager settings.
-            UserAgentManager.setupUserAgentManager(null, userAgentManager);
-
-            return; // Nothing else to initialize.
-        }
-
-        // The interval that the server will check if the client is still available.
-        serverToClientHeartbeatMillis = properties.getPropertyAsLong(SERVER_TO_CLIENT_HEARTBEAT_MILLIS, DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS);
-        setServerToClientHeartbeatMillis(serverToClientHeartbeatMillis);
-
-        setInvalidateMessageClientOnStreamingClose(properties.getPropertyAsBoolean(PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE, DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE));
-
-        // Number of minutes a client can remain idle before the server times the connection out.
-        int connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES, getConnectionIdleTimeoutMinutes());
-        if (connectionIdleTimeoutMinutes != 0)
-        {
-            setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
-        }
-        else
-        {
-            connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES, getConnectionIdleTimeoutMinutes());
-            if (connectionIdleTimeoutMinutes != 0)
-                setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
-        }
-
-        // User-agent configuration for kick-start bytes and max streaming connections per session.
-        UserAgentManager.setupUserAgentManager(properties, userAgentManager);
-
-        // Maximum number of clients allowed to have streaming HTTP connections with the endpoint.
-        maxStreamingClients = properties.getPropertyAsInt(MAX_STREAMING_CLIENTS, DEFAULT_MAX_STREAMING_CLIENTS);
-
-        // Set initial state for the canWait flag based on whether we allow waits or not.
-        canStream = (maxStreamingClients > 0);
-    }
-
-
-    @Override
-    public void start()
-    {
-        if (isStarted())
-            return;
-
-        super.start();
-
-        if (connectionIdleTimeoutMinutes > 0)
-        {
-            pushNotifierTimeoutManager = new TimeoutManager(new ThreadFactory()
-                                                            {
-                                                                int counter = 1;
-                                                                public synchronized Thread newThread(Runnable runnable)
-                                                                {
-                                                                    Thread t = new Thread(runnable);
-                                                                    t.setName(getId() + "-StreamingConnectionTimeoutThread-" + counter++);
-                                                                    return t;
-                                                                }
-                                                            });
-        }
-
-        currentStreamingRequests = new ConcurrentHashMap<String, EndpointPushNotifier>();
-    }
-
-    /**
-     * (non-JavaDoc)
-     * @see flex.messaging.endpoints.AbstractEndpoint#stop()
-     */
-    @Override
-    public void stop()
-    {
-        if (!isStarted())
-            return;
-
-        // Shutdown the timeout manager for streaming connections cleanly.
-        if (pushNotifierTimeoutManager != null)
-        {
-            pushNotifierTimeoutManager.shutdown();
-            pushNotifierTimeoutManager = null;
-        }
-
-        // Shutdown any currently open streaming connections.
-        for (EndpointPushNotifier notifier : currentStreamingRequests.values())
-            notifier.close();
-
-        currentStreamingRequests = null;
-
-        super.stop();
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Variables
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Used to synchronize sets and gets to the number of streaming clients.
-     */
-    protected final Object lock = new Object();
-
-    /**
-     * Used to keep track of the mapping between user agent match strings and
-     * the bytes needed to kickstart their streaming connections.
-     */
-    protected UserAgentManager userAgentManager = new UserAgentManager();
-
-    /**
-     * This flag is volatile to allow for consistent reads across thread without
-     * needing to pay the cost for a synchronized lock for each read.
-     */
-    private volatile boolean canStream = true;
-
-    /**
-     * Manages timing out EndpointPushNotifier instances.
-     */
-    private volatile TimeoutManager pushNotifierTimeoutManager;
-
-    /**
-     * A Map(EndpointPushNotifier, Boolean.TRUE) containing all currently open streaming notifiers
-     * for this endpoint.
-     * Used for clean shutdown.
-     */
-    private ConcurrentHashMap<String, EndpointPushNotifier> currentStreamingRequests;
-
-    //--------------------------------------------------------------------------
-    //
-    // Properties
-    //
-    //--------------------------------------------------------------------------
-
-    //------------------------------------------
-    //  invalidateMessageClientOnStreamingClose
-    //-----------------------------------------
-
-    private volatile boolean invalidateMessageClientOnStreamingClose = DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE;
-
-    /**
-     * Returns whether invalidate-messageclient-on-streaming-close is enabled.
-     * See {@link BaseStreamingHTTPEndpoint#setInvalidateMessageClientOnStreamingClose(boolean)}
-     * for details.
-     *
-     * @return <code>true</code> if the invalidate-messageclient-on-streaming-close is enabled, <code>false</code> otherwise.
-     */
-    public boolean isInvalidateMessageClientOnStreamingClose()
-    {
-        return invalidateMessageClientOnStreamingClose;
-    }
-
-    /**
-     * Sets the invalidate-messageclient-on-streaming close property. If enabled,
-     * when the streaming connection is closed for whatever reason (for example, the client is gone),
-     * the client's associated MessageClient on the server is invalidated immediately.
-     * This is useful in scenarios where there is a constant stream of messages, the client is gone,
-     * and the streaming connection is closed, but the session has not timed out on the server yet.
-     * In that case, enabling this property will prevent messages accumulating on the session on behalf
-     * of the MessageClient that will invalidate.
-     * <p>
-     * Important: Do not enable this property when reliable messaging is used, otherwise
-     * reliable reconnect attempts will not happen correctly.</p>
-     *
-     * @param value The property value.
-     */
-    public void setInvalidateMessageClientOnStreamingClose(boolean value)
-    {
-        invalidateMessageClientOnStreamingClose = value;
-    }
-
-    //----------------------------------
-    //  serverToClientHeartbeatMillis
-    //----------------------------------
-
-    private long serverToClientHeartbeatMillis = DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS;
-
-    /**
-     * Retrieves the number of milliseconds the server will wait before writing a
-     * single <code>null</code> byte to the streaming connection, to ensure the client is
-     * still available.
-     * @return The server-to-client heartbeat time in milliseconds.
-     */
-    public long getServerToClientHeartbeatMillis()
-    {
-        return serverToClientHeartbeatMillis;
-    }
-
-    /**
-     * Retrieves the number of milliseconds the server will wait before writing a
-     * single <code>null</code> byte to the streaming connection to ensure the client is
-     * still available when there are no new messages for the client.
-     * A non-positive value means the server will wait forever for new messages and
-     * it will not write the <code>null</code> byte to determine if the client is available.
-     * @param serverToClientHeartbeatMillis The server-to-client heartbeat time in milliseconds.
-     */
-    public void setServerToClientHeartbeatMillis(long serverToClientHeartbeatMillis)
-    {
-        if (serverToClientHeartbeatMillis < 0)
-            serverToClientHeartbeatMillis = 0;
-        this.serverToClientHeartbeatMillis = serverToClientHeartbeatMillis;
-    }
-
-    //----------------------------------
-    //  connectionIdleTimeoutMinutes
-    //----------------------------------
-
-    private int connectionIdleTimeoutMinutes = 0;
-
-    /**
-     * Retrieves the number of minutes a client can remain idle before the server
-     * times the connection out. The default value is 0, indicating that connections
-     * will not be timed out and must be closed by the client or server, either explicitly
-     * or by either process terminating.
-     *
-     * @return The number of minutes a client can remain idle before the server
-     * times the connection out.
-     */
-    public int getConnectionIdleTimeoutMinutes()
-    {
-        return connectionIdleTimeoutMinutes;
-    }
-
-    /**
-     * Sets the number of minutes a client can remain idle before the server
-     * times the connection out. A value of 0 or below indicates that
-     * connections will not be timed out.
-     *
-     * @param value The number of minutes a client can remain idle
-     * before the server times the connection out.
-     */
-    public void setConnectionIdleTimeoutMinutes(int value)
-    {
-        if (value < 0)
-            value = 0;
-
-        this.connectionIdleTimeoutMinutes = value;
-    }
-
-    /**
-     * (non-JavaDoc)
-     * @deprecated Use {@link BaseStreamingHTTPEndpoint#getConnectionIdleTimeoutMinutes()} instead.
-     */
-    public int getIdleTimeoutMinutes()
-    {
-        return getConnectionIdleTimeoutMinutes();
-    }
-
-    /**
-     * (non-JavaDoc)
-     * @deprecated Use {@link BaseStreamingHTTPEndpoint#setConnectionIdleTimeoutMinutes(int)} instead.
-     */
-    public void setIdleTimeoutMinutes(int value)
-    {
-        setConnectionIdleTimeoutMinutes(value);
-    }
-
-    //----------------------------------
-    //  maxStreamingClients
-    //----------------------------------
-
-    private int maxStreamingClients = DEFAULT_MAX_STREAMING_CLIENTS;
-
-    /**
-     * Retrieves the maximum number of clients that will be allowed to establish
-     * a streaming HTTP connection with the endpoint.
-     *
-     * @return The maximum number of clients that will be allowed to establish
-     * a streaming HTTP connection with the endpoint.
-     */
-    public int getMaxStreamingClients()
-    {
-        return maxStreamingClients;
-    }
-
-    /**
-     * Sets the maximum number of clients that will be allowed to establish
-     * a streaming HTTP connection with the server.
-     *
-     * @param maxStreamingClients The maximum number of clients that will be allowed
-     * to establish a streaming HTTP connection with the server.
-     */
-    public void setMaxStreamingClients(int maxStreamingClients)
-    {
-        this.maxStreamingClients = maxStreamingClients;
-        canStream = (streamingClientsCount < maxStreamingClients);
-    }
-
-    //----------------------------------
-    //  streamingClientsCount
-    //----------------------------------
-
-    protected int streamingClientsCount;
-
-    /**
-     * Retrieves the the number of clients that are currently in the streaming state.
-     *
-     * @return The number of clients that are currently in the streaming state.
-     */
-    public int getStreamingClientsCount()
-    {
-        return streamingClientsCount;
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Public Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Handles HTTP requests targetting this endpoint.
-     * Two types or requests are supported. If the request is a regular request-response AMF/AMFX
-     * message it is handled by the base logic in BaseHTTPEndpoint.service. However, if it is a
-     * request to open a streaming HTTP connection to the client this endpoint performs some
-     * validation checks and then holds the connection open to stream data back to the client
-     * over.
-     *
-     * @param req The original servlet request.
-     * @param res The active servlet response.
-     */
-    @Override
-    public void service(HttpServletRequest req, HttpServletResponse res)
-    {
-        String command = req.getParameter(COMMAND_PARAM_NAME);
-        if (command != null)
-            serviceStreamingRequest(req, res);
-        else // Let BaseHTTPEndpoint logic handle regular request-response messaging.
-            super.service(req, res);
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Protected Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * If the message has MPI enabled, this method adds all the needed performance
-     * headers to the message.
-     *
-     * @param message Message to add performance headers to.
-     */
-    protected void addPerformanceInfo(Message message)
-    {
-        MessagePerformanceInfo mpiOriginal = getMPI(message);
-        if (mpiOriginal == null)
-            return;
-
-        MessagePerformanceInfo mpip = (MessagePerformanceInfo)mpiOriginal.clone();
-        try
-        {
-            // Set the original message info as the pushed causer info.
-            MessagePerformanceUtils.setMPIP(message, mpip);
-            MessagePerformanceUtils.setMPII(message, null);
-        }
-        catch (Exception e)
-        {
-            if (Log.isDebug())
-                log.debug("MPI exception while streaming the message: " + e.toString());
-        }
-
-        // Overhead only used when MPI is enabled for sizing
-        MessagePerformanceInfo mpio = new MessagePerformanceInfo();
-        if (mpip.recordMessageTimes)
-        {
-            mpio.sendTime = System.currentTimeMillis();
-            mpio.infoType = "OUT";
-        }
-        mpio.pushedFlag = true;
-        MessagePerformanceUtils.setMPIO(message, mpio);
-
-        // If MPI sizing information is enabled serialize again so that we know size
-        if (mpip.recordMessageSizes)
-        {
-            try
-            {
-                // Each subclass serializes the message in their own format to
-                // get the message size for the MPIO.
-                long serializationOverhead = System.currentTimeMillis();
-                mpio.messageSize = getMessageSizeForPerformanceInfo(message);
-
-                // Set serialization overhead to the time calculated during serialization above
-                if (mpip.recordMessageTimes)
-                {
-                    serializationOverhead = System.currentTimeMillis() - serializationOverhead;
-                    mpip.addToOverhead(serializationOverhead);
-                    mpiOriginal.addToOverhead(serializationOverhead);
-                    mpio.sendTime = System.currentTimeMillis();
-                }
-            }
-            catch(Exception e)
-            {
-                log.debug("MPI exception while streaming the message: " + e.toString());
-            }
-        }
-    }
-
-    /**
-     * Utility method to convert streamed push messages to their small versions
-     * if the channel-endpoint combination supports small messages.
-     *
-     * @param message The regular message.
-     * @return The small message if the message has a small version, or regular message
-     * if it doesn't .
-     */
-    protected Message convertPushMessageToSmall(Message message)
-    {
-        FlexSession session = FlexContext.getFlexSession();
-        if (session != null && session.useSmallMessages())
-            return convertToSmallMessage(message);
-        return message;
-    }
-
-    /**
-     * Used internally for performance information gathering; not intended for
-     * public use. The default implementation of this method returns zero.
-     * Subclasses should overwrite if they want to accurately report message
-     * size information in performance information gathering.
-     *
-     * @param message Message to get the size for.
-     *
-     * @return The size of the message after message is serialized.
-     */
-    protected long getMessageSizeForPerformanceInfo(Message message)
-    {
-        return 0;
-    }
-
-    /**
-     * This streaming endpoint does not support polling clients.
-     *
-     * @param flexClient The FlexClient that issued the poll request.
-     * @param pollCommand The poll command from the client.
-     * @return The flush info used to build the poll response.
-     */
-    @Override
-    protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand)
-    {
-        MessageException me = new MessageException();
-        me.setMessage(POLL_NOT_SUPPORTED_MESSAGE);
-        me.setDetails(POLL_NOT_SUPPORTED_MESSAGE);
-        me.setCode(POLL_NOT_SUPPORTED_CODE);
-        throw me;
-    }
-
-    /**
-     * Handles streaming connection open command sent by the FlexClient.
-     *
-     * @param req The <code>HttpServletRequest</code> to service.
-     * @param res The <code>HttpServletResponse</code> to be used in case an error
-     * has to be sent back.
-     * @param flexClient FlexClient that requested the streaming connection.
-     */
-    protected void handleFlexClientStreamingOpenRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient)
-    {
-        FlexSession session = FlexContext.getFlexSession();
-        if (canStream && session.canStream)
-        {
-            // If canStream/session.canStream is true it means we currently have
-            // less than the max number of allowed streaming threads, per endpoint/session.
-
-            // We need to protect writes/reads to the stream count with the endpoint's lock.
-            // Also, we have to be careful to handle the case where two threads get to this point when only
-            // one streaming spot remains; one thread will win and the other needs to fault.
-            boolean thisThreadCanStream;
-            synchronized (lock)
-            {
-                ++streamingClientsCount;
-                if (streamingClientsCount == maxStreamingClients)
-                {
-                    thisThreadCanStream = true; // This thread got the last spot.
-                    canStream = false;
-                }
-                else if (streamingClientsCount > maxStreamingClients)
-                {
-                    thisThreadCanStream = false; // This thread was beaten out for the last spot.
-                    --streamingClientsCount; // Decrement the count because we're not going to grant the streaming right to the client.
-                }
-                else
-                {
-                    // We haven't hit the limit yet, allow this thread to stream.
-                    thisThreadCanStream = true;
-                }
-            }
-
-            // If the thread cannot wait due to endpoint streaming connection
-            // limit, inform the client and return.
-            if (!thisThreadCanStream)
-            {
-                String errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
-                        + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '"
-                        + maxStreamingClients + "' has been reached.";
-                if (Log.isError())
-                    log.error(errorMessage);
-                try
-                {
-                    errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached.";
-                    res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
-                }
-                catch (IOException ignore)
-                {}
-                return;
-            }
-
-            // Setup for specific user agents.
-            byte[] kickStartBytesToStream = null;
-            String userAgentValue = req.getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
-            UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
-            if (agentSettings != null)
-            {
-                synchronized (session)
-                {
-                    session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
-                }
-
-                int kickStartBytes = agentSettings.getKickstartBytes();
-                if (kickStartBytes > 0)
-                {
-                    // Determine the minimum number of actual bytes that need to be sent to
-                    // kickstart, taking into account transfer-encoding overhead.
-                    try
-                    {
-                        int chunkLengthHeaderSize = Integer.toHexString(kickStartBytes).getBytes("ASCII").length;
-                        int chunkOverhead = chunkLengthHeaderSize + 4; // 4 for the 2 wrapping CRLF tokens.
-                        int minimumKickstartBytes = kickStartBytes - chunkOverhead;
-                        kickStartBytesToStream = new byte[(minimumKickstartBytes > 0) ? minimumKickstartBytes :
-                                kickStartBytes];
-                    }
-                    catch (UnsupportedEncodingException ignore)
-                    {
-                        kickStartBytesToStream = new byte[kickStartBytes];
-                    }
-                    Arrays.fill(kickStartBytesToStream, NULL_BYTE);
-                }
-            }
-
-            // Now, check with the session before granting the streaming connection.
-            synchronized(session)
-            {
-                ++session.streamingConnectionsCount;
-                if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED)
-                {
-                    thisThreadCanStream = true;
-                }
-                else if (session.streamingConnectionsCount == session.maxConnectionsPerSession)
-                {
-                    thisThreadCanStream = true; // This thread got the last spot in the session.
-                    session.canStream = false;
-                }
-                else if (session.streamingConnectionsCount > session.maxConnectionsPerSession)
-                {
-                    thisThreadCanStream = false; // This thread was beaten out for the last spot.
-                    --session.streamingConnectionsCount;
-                    synchronized(lock)
-                    {
-                        // Decrement the endpoint count because we're not going to grant the streaming right to the client.
-                        --streamingClientsCount;
-                    }
-                }
-                else
-                {
-                    // We haven't hit the limit yet, allow this thread to stream.
-                    thisThreadCanStream = true;
-                }
-            }
-
-            // If the thread cannot wait due to session streaming connection
-            // limit, inform the client and return.
-            if (!thisThreadCanStream)
-            {
-                if (Log.isError())
-                    log.error("Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
-                            + flexClient.getId() + "' because " + UserAgentManager.MAX_PERSISTENT_CONNECTIONS_PER_SESSION + " limit of '" + session.maxConnectionsPerSession
-                            + ((agentSettings != null) ? "' for user-agent '" + agentSettings.getMatchOn() + "'" : "") +  " has been reached." );
-                try
-                {
-                    // Return an HTTP status code 400.
-                    String errorMessage = "The server cannot grant streaming connection to this client because limit has been reached.";
-                    res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
-                }
-                catch (IOException ignore)
-                {
-                    // NOWARN
-                }
-                return;
-            }
-
-            Thread currentThread = Thread.currentThread();
-            String threadName = currentThread.getName();
-            EndpointPushNotifier notifier = null;
-            boolean suppressIOExceptionLogging = false; // Used to suppress logging for IO exception.
-            try
-            {
-                currentThread.setName(threadName + STREAMING_THREAD_NAME_EXTENSION);
-
-                // Open and commit response headers and get output stream.
-                if (addNoCacheHeaders)
-                    addNoCacheHeaders(req, res);
-                res.setContentType(getResponseContentType());
-                res.setHeader("Connection", "close");
-                res.setHeader("Transfer-Encoding", "chunked");
-                ServletOutputStream os = res.getOutputStream();
-                res.flushBuffer();
-
-                // If kickstart-bytes are specified, stream them.
-                if (kickStartBytesToStream != null)
-                {
-                    if (Log.isDebug())
-                        log.debug("Endpoint with id '" + getId() + "' is streaming " + kickStartBytesToStream.length
-                                + " bytes (not counting chunk encoding overhead) to kick-start the streaming connection for FlexClient with id '"
-                                + flexClient.getId() + "'.");
-
-                    streamChunk(kickStartBytesToStream, os, res);
-                }
-
-                // Setup serialization and type marshalling contexts
-                setThreadLocals();
-
-                // Activate streaming helper for this connection.
-                // Watch out for duplicate stream issues.
-                try
-                {
-                    notifier = new EndpointPushNotifier(this, flexClient);
-                }
-                catch (MessageException me)
-                {
-                    if (me.getNumber() == 10033) // It's a duplicate stream request from the same FlexClient. Leave the current stream in place and fault this.
-                    {
-                        if (Log.isWarn())
-                            log.warn("Endpoint with id '" + getId() + "' received a duplicate streaming connection request from, FlexClient with id '"
-                                    + flexClient.getId() + "'. Faulting request.");
-
-                        // Rollback counters and send an error response.
-                        synchronized (lock)
-                        {
-                            --streamingClientsCount;
-                            canStream = (streamingClientsCount < maxStreamingClients);
-                            synchronized (session)
-                            {
-                                --session.streamingConnectionsCount;
-                                session.canStream = (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
-                                        || session.streamingConnectionsCount < session.maxConnectionsPerSession);
-                            }
-                        }
-                        try
-                        {
-                            res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-                        }
-                        catch (IOException ignore)
-                        {
-                            // NOWARN
-                        }
-                        return; // Exit early.
-                    }
-                }
-                if (connectionIdleTimeoutMinutes > 0)
-                    notifier.setIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
-                notifier.setLogCategory(getLogCategory());
-                monitorTimeout(notifier);
-                currentStreamingRequests.put(notifier.getNotifierId(), notifier);
-
-                // Push down an acknowledgement for the 'connect' request containing the unique id for this specific stream.
-                AcknowledgeMessage connectAck = new AcknowledgeMessage();
-                connectAck.setBody(notifier.getNotifierId());
-                connectAck.setCorrelationId(BaseStreamingHTTPEndpoint.OPEN_COMMAND);
-                ArrayList toPush = new ArrayList(1);
-                toPush.add(connectAck);
-                streamMessages(toPush, os, res);
-
-                // Output session level streaming count.
-                if (Log.isDebug())
-                    Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming clients for FlexSession with id '"+ session.getId() +"' is " + session.streamingConnectionsCount + ".");
-
-                // Output endpoint level streaming count.
-                if (Log.isDebug())
-                    log.debug("Number of streaming clients for endpoint with id '"+ getId() +"' is " + streamingClientsCount + ".");
-
-                // And cycle in a wait-notify loop with the aid of the helper until it
-                // is closed, we're interrupted or the act of streaming data to the client fails.
-                while (!notifier.isClosed())
-                {
-                    try
-                    {
-                        // Drain any messages that might have been accumulated
-                        // while the previous drain was being processed.
-                        List<AsyncMessage> messages = null;
-                        synchronized (notifier.pushNeeded)
-                        {
-                            messages = notifier.drainMessages();
-                        }
-                        streamMessages(messages, os, res);
-
-                        synchronized (notifier.pushNeeded)
-                        {
-                            notifier.pushNeeded.wait(serverToClientHeartbeatMillis);
-                        
-                            messages = notifier.drainMessages();
-                        }
-                        // If there are no messages to send to the client, send an null
-                        // byte as a heartbeat to make sure the client is still valid.
-                        if (messages == null && serverToClientHeartbeatMillis > 0)
-                        {
-                            try
-                            {
-                                os.write(NULL_BYTE);
-                                res.flushBuffer();
-                            }
-                            catch (IOException e)
-                            {
-                                if (Log.isWarn())
-                                    log.warn("Endpoint with id '" + getId() + "' is closing the streaming connection to FlexClient with id '"
-                                            + flexClient.getId() + "' because endpoint encountered a socket write error" +
-                                            ", possibly due to an unresponsive FlexClient.", e);
-                                break; // Exit the wait loop.
-                            }
-                        }
-                        // Otherwise stream the messages to the client.
-                        else
-                        {
-                            // Update the last time notifier was used to drain messages.
-                            // Important for idle timeout detection.
-                            notifier.updateLastUse();
-
-                            streamMessages(messages, os, res);
-                        }
-                    }
-                    catch (InterruptedException e)
-                    {
-                        if (Log.isWarn())
-                            log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' has been interrupted and the streaming connection will be closed.");
-                        os.close();
-                        break; // Exit the wait loop.
-                    }
-
-                    // Update the FlexClient last use time to prevent FlexClient from
-                    // timing out when the client is still subscribed. It is important
-                    // to do this outside synchronized(notifier.pushNeeded) to avoid
-                    // thread deadlock!
-                    flexClient.updateLastUse();
-                }
-                if (Log.isDebug())
-                    log.debug("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' is releasing connection and returning to the request handler pool.");
-                suppressIOExceptionLogging = true;
-                // Terminate the response.
-                streamChunk(null, os, res);
-            }
-            catch (IOException e)
-            {
-                if (Log.isWarn() && !suppressIOExceptionLogging)
-                    log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' is closing connection due to an IO error.", e);
-            }
-            finally
-            {
-                currentThread.setName(threadName);
-
-                // We're done so decrement the counts for streaming threads,
-                // and update the canStream flag if necessary.
-                synchronized (lock)
-                {
-                    --streamingClientsCount;
-                    canStream = (streamingClientsCount < maxStreamingClients);
-                    synchronized (session)
-                    {
-                        --session.streamingConnectionsCount;
-                        session.canStream = (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
-                                || session.streamingConnectionsCount < session.maxConnectionsPerSession);
-                    }
-                }
-
-                if (notifier != null && currentStreamingRequests != null)
-                {
-                    currentStreamingRequests.remove(notifier.getNotifierId());
-                    notifier.close();
-                }
-
-                // Output session level streaming count.
-                if (Log.isDebug())
-                    Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming clients for FlexSession with id '"+ session.getId() +"' is " + session.streamingConnectionsCount + ".");
-
-                // Output endpoint level streaming count.
-                if (Log.isDebug())
-                    log.debug("Number of streaming clients for endpoint with id '"+ getId() +"' is " + streamingClientsCount + ".");
-            }
-        }
-        // Otherwise, client's streaming connection open request could not be granted.
-        else
-        {
-            if (Log.isError())
-            {
-                String logString = null;
-                if (!canStream)
-                {
-                    logString = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '"
-                    + maxStreamingClients + "' has been reached.";
-                }
-                else if (!session.canStream)
-                {
-                    logString = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit of '"
-                    + session.maxConnectionsPerSession + "' has been reached.";
-                }
-                if (logString != null)
-                    log.error(logString);
-            }
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that client request can't be processed.
-                String errorMessage = null;
-                if (!canStream)
-                {
-                    errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached.";
-                }
-                else if (!session.canStream)
-                {
-                    errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
-                    + flexClient.getId() + "' because " + UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit has been reached.";
-                }
-                res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
-            }
-            catch (IOException ignore)
-            {}
-        }
-    }
-
-    /**
-     * Handles streaming connection close command sent by the FlexClient.
-     *
-     * @param req The <code>HttpServletRequest</code> to service.
-     * @param res The <code>HttpServletResponse</code> to be used in case an error
-     * has to be sent back.
-     * @param flexClient FlexClient that requested the streaming connection.
-     * @param streamId The id for the stream to close.
-     */
-    protected void handleFlexClientStreamingCloseRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient, String streamId)
-    {
-        if (streamId != null)
-        {
-            EndpointPushNotifier notifier = (EndpointPushNotifier)flexClient.getEndpointPushHandler(getId());
-            if ((notifier != null) && notifier.getNotifierId().equals(streamId))
-                notifier.close();
-        }
-    }
-
-    /**
-     * Service streaming connection commands.
-     *
-     * @param req The <code>HttpServletRequest</code> to service.
-     * @param res The <code>HttpServletResponse</code> to be used in case an error
-     * has to be sent back.
-     */
-    protected void serviceStreamingRequest(HttpServletRequest req, HttpServletResponse res)
-    {
-        // If this is a request for a streaming connection, make sure it's for a valid FlexClient
-        // and that the FlexSession doesn't already have a streaming connection open.
-        // Streaming requests are POSTs (to help prevent the possibility of caching) that carry the
-        // following parameters:
-        // command - Indicating a custom command for the endpoint; currently 'open' to request a new
-        //           streaming connection be opened, and 'close' to request the streaming connection
-        //           to close.
-        // version - Indicates the streaming connection 'version' to use; it's here for backward comp. support
-        //           if we need to change how commands are handled in a future product release.
-        // DSId - The FlexClient id value that uniquely identifies the swf making the request.
-        String command = req.getParameter(COMMAND_PARAM_NAME);
-
-        // Only HTTP 1.1 is supported, disallow HTTP 1.0.
-        if (req.getProtocol().equals(HTTP_1_0))
-        {
-            if (Log.isError())
-                log.error("Endpoint with id '" + getId() + "' cannot service the streaming request made with " +
-                " HTTP 1.0. Only HTTP 1.1 is supported.");
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (bad command).
-                res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-            }
-            catch (IOException ignore)
-            {}
-            return; // Abort further server processing.
-        }
-
-        if (!(command.equals(OPEN_COMMAND) || command.equals(CLOSE_COMMAND)))
-        {
-            if (Log.isError())
-                log.error("Endpoint with id '" + getId() + "' cannot service the streaming request as the supplied command '"
-                        + command + "' is invalid.");
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (bad command).
-                res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-            }
-            catch (IOException ignore)
-            {}
-            return; // Abort further server processing.
-        }
-
-        String flexClientId = req.getParameter(Message.FLEX_CLIENT_ID_HEADER);
-        if (flexClientId == null)
-        {
-            if (Log.isError())
-                log.error("Endpoint with id '" + getId() + "' cannot service the streaming request as no FlexClient id"
-                        + " has been supplied in the request.");
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (missing id).
-                res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-            }
-            catch (IOException ignore)
-            {}
-            return; // Abort further server processing.
-        }
-
-        // Validate that the provided FlexClient id exists and is associated with the current session.
-        // We don't do this validation with CLOSE_COMMAND because CLOSE_COMMAND can come in on a
-        // different session. For example, when the session expires due to timeout, the streaming client
-        // using that session sends a CLOSE_COMMAND on a new session to let the server know to clean client's
-        // corresponding server constructs. In that case, server already knows that session has expired so
-        // we can simply omit this validation.
-        FlexClient flexClient = null;
-        List<FlexClient> flexClients = FlexContext.getFlexSession().getFlexClients();
-        boolean validFlexClientId = false;
-        for (Iterator<FlexClient> iter = flexClients.iterator(); iter.hasNext();)
-        {
-            flexClient = iter.next();
-            if (flexClient.getId().equals(flexClientId) && flexClient.isValid())
-            {
-                validFlexClientId = true;
-                break;
-            }
-        }
-        if (!command.equals(CLOSE_COMMAND) && !validFlexClientId)
-        {
-            if (Log.isError())
-                log.error("Endpoint with id '" + getId() + "' cannot service the streaming request as either the supplied"
-                        + " FlexClient id '" + flexClientId + " is not valid, or the FlexClient with that id is not valid.");
-
-            try
-            {
-                // Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (invalid id).
-                res.sendError(HttpServletResponse.SC_BAD_REQUEST);
-            }
-            catch (IOException ignore)
-            {}
-            return; // Abort further server processing.
-        }
-
-        // If a close command is received and we don't have any flex clients registered simply invalidate 
-        // the Flex Session. This will take care of the Flex Session that got created when the MB servlet 
-        // was processing the CLOSE request.
-        if (command.equals(CLOSE_COMMAND) && flexClients.size() == 0)
-        {
-            FlexSession flexSession = FlexContext.getFlexSession();
-            if (flexSession instanceof HttpFlexSession)
-            {
-                ((HttpFlexSession)flexSession).invalidate(false);
-            }
-            return;
-        }
-
-        if (flexClient != null)
-        {
-            if (command.equals(OPEN_COMMAND))
-                handleFlexClientStreamingOpenRequest(req, res, flexClient);
-            else if (command.equals(CLOSE_COMMAND))
-                handleFlexClientStreamingCloseRequest(req, res, flexClient, req.getParameter(STREAM_ID_PARAM_NAME));
-        }
-    }
-
-    /**
-     * Helper method to write a chunk of bytes to the output stream in an HTTP
-     * "Transfer-Encoding: chunked" format.
-     * If the bytes array is null or empty, a terminal chunk will be written to
-     * signal the end of the response.
-     * Once the chunk is written to the output stream, the stream will be flushed immediately (no buffering).
-     *
-     * @param bytes The array of bytes to write as a chunk in the response; or if null, the signal to write the final chunk to complete the response.
-     * @param os The output stream the chunk will be written to.
-     * @param response The HttpServletResponse, used to flush the chunk to the client.
-     *
-     * @throws IOException if writing the chunk to the output stream fails.
-     */
-    protected void streamChunk(byte[] bytes, ServletOutputStream os, HttpServletResponse response) throws IOException
-    {
-        if ((bytes != null) && (bytes.length > 0))
-        {
-            byte[] chunkLength = Integer.toHexString(bytes.length).getBytes("ASCII");
-            os.write(chunkLength);
-            os.write(CRLF_BYTES);
-            os.write(bytes);
-            os.write(CRLF_BYTES);
-            response.flushBuffer();
-        }
-        else // Send final 'EOF' chunk for the response.
-        {
-            os.write(ZERO_BYTE);
-            os.write(CRLF_BYTES);
-            response.flushBuffer();
-        }
-    }
-
-    /**
-     * Helper method invoked by the endpoint request handler thread cycling in wait-notify.
-     * Serializes messages and streams each to the client as a response chunk using streamChunk().
-     *
-     * @param messages The messages to serialize and push to the client.
-     * @param os The output stream the chunk will be written to.
-     * @param response The HttpServletResponse, used to flush the chunk to the client.
-     */
-    protected abstract void streamMessages(List messages, ServletOutputStream os, HttpServletResponse response) throws IOException;
-
-    /**
-     * Given a message, returns the MessagePerformanceInfo object if the message
-     * performance gathering is enabled, returns null otherwise.
-     *
-     * @param message The message.
-     * @return MessagePerformanceInfo if the message performance gathering is enabled,
-     * null otherwise.
-     */
-    protected MessagePerformanceInfo getMPI(Message message)
-    {
-        return (isRecordMessageSizes() || isRecordMessageTimes())?
-                MessagePerformanceUtils.getMPII(message) : null;
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Private methods.
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Utility method used at EndpointPushNotifier construction to monitor it for timeout.
-     *
-     * @param notifier The EndpointPushNotifier to monitor.
-     */
-    private void monitorTimeout(EndpointPushNotifier notifier)
-    {
-        if (pushNotifierTimeoutManager != null)
-            pushNotifierTimeoutManager.scheduleTimeout(notifier);
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints;
+
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.HttpFlexSession;
+import flex.messaging.MessageException;
+import flex.messaging.client.EndpointPushNotifier;
+import flex.messaging.client.FlexClient;
+import flex.messaging.client.FlushResult;
+import flex.messaging.client.UserAgentSettings;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.log.Log;
+import flex.messaging.messages.AcknowledgeMessage;
+import flex.messaging.messages.AsyncMessage;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.Message;
+import flex.messaging.messages.MessagePerformanceInfo;
+import flex.messaging.messages.MessagePerformanceUtils;
+import flex.messaging.util.TimeoutManager;
+import flex.messaging.util.UserAgentManager;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Base class for HTTP-based endpoints that support streaming HTTP connections to
+ * connected clients.
+ * Each streaming connection managed by this endpoint consumes one of the request
+ * handler threads provided by the servlet container, so it is not highly scalable
+ * but offers performance advantages over client polling for clients receiving
+ * a steady, rapid stream of pushed messages.
+ * This endpoint does not support polling clients and will fault any poll requests
+ * that are received. To support polling clients use subclasses of
+ * BaseHTTPEndpoint instead.
+ */
+public abstract class BaseStreamingHTTPEndpoint extends BaseHTTPEndpoint
+{
+    //--------------------------------------------------------------------------
+    //
+    // Private Static Constants
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * This token is used in chunked HTTP responses frequently so initialize it statically for general use.
+     */
+    private static final byte[] CRLF_BYTES = {(byte)13, (byte)10};
+
+    /**
+     * This token is used for the terminal chunk within a chunked response.
+     */
+    private static final byte ZERO_BYTE = (byte)48;
+
+    /**
+     * This token is used to signal that a chunk of data should be skipped by the client.
+     */
+    private static final byte NULL_BYTE = (byte)0;
+
+    /**
+     * Parameter name for 'command' passed in a request for a new streaming connection.
+     */
+    private static final String COMMAND_PARAM_NAME = "command";
+
+    /**
+     * This is the token at the end of the HTTP request line that indicates that it is
+     * a stream connection that should be held open to push data back to the client,
+     * as opposed to a regular request-response message.
+     */
+    private static final String OPEN_COMMAND = "open";
+
+    /**
+     * This is the token at the end of the HTTP request line that indicates that it is
+     * a stream connection that should be closed.
+     */
+    private static final String CLOSE_COMMAND = "close";
+
+    /**
+     *  Parameter name for the stream ID; passed with commands for an existing streaming connection.
+     */
+    private static final String STREAM_ID_PARAM_NAME = "streamId";
+
+    /**
+     * Constant for HTTP/1.0.
+     */
+    private static final String HTTP_1_0 = "HTTP/1.0";
+
+    /**
+     * Thread name suffix for request threads that are servicing a pinned open streaming connection.
+     */
+    private static final String STREAMING_THREAD_NAME_EXTENSION = "-in-streaming-mode";
+
+    /**
+     * Configuration constants.
+     */
+    private static final String PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES = "connection-idle-timeout-minutes";
+    private static final String PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES = "idle-timeout-minutes";
+    private static final String MAX_STREAMING_CLIENTS = "max-streaming-clients";
+    private static final String SERVER_TO_CLIENT_HEARTBEAT_MILLIS = "server-to-client-heartbeat-millis";
+    private static final String PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = "invalidate-messageclient-on-streaming-close";
+
+    /**
+     * Defaults.
+     */
+    private static final boolean DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = false;
+    private static final int DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS = 5000;
+    private static final int DEFAULT_MAX_STREAMING_CLIENTS = 10;
+
+    /**
+     * Errors.
+     */
+    public static final String POLL_NOT_SUPPORTED_CODE = "Server.PollNotSupported";
+    public static final int POLL_NOT_SUPPORTED_MESSAGE = 10034;
+
+
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs an unmanaged <code>BaseStreamingHTTPEndpoint</code>.
+     */
+    public BaseStreamingHTTPEndpoint()
+    {
+        this(false);
+    }
+
+    /**
+     * Constructs an <code>BaseStreamingHTTPEndpoint</code> with the indicated management.
+     *
+     * @param enableManagement <code>true</code> if the <code>BaseStreamingHTTPEndpoint</code>
+     * is manageable; <code>false</code> otherwise.
+     */
+    public BaseStreamingHTTPEndpoint(boolean enableManagement)
+    {
+        super(enableManagement);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Initialize, validate, start, and stop methods.
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Initializes the <code>Endpoint</code> with the properties.
+     * If subclasses override this method, they must call <code>super.initialize()</code>.
+     *
+     * @param id The ID of the <code>Endpoint</code>.
+     * @param properties Properties for the <code>Endpoint</code>.
+     */
+    @Override
+    public void initialize(String id, ConfigMap properties)
+    {
+        super.initialize(id, properties);
+
+        if (properties == null || properties.size() == 0)
+        {
+            // Initialize default user agent manager settings.
+            UserAgentManager.setupUserAgentManager(null, userAgentManager);
+
+            return; // Nothing else to initialize.
+        }
+
+        // The interval that the server will check if the client is still available.
+        serverToClientHeartbeatMillis = properties.getPropertyAsLong(SERVER_TO_CLIENT_HEARTBEAT_MILLIS, DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS);
+        setServerToClientHeartbeatMillis(serverToClientHeartbeatMillis);
+
+        setInvalidateMessageClientOnStreamingClose(properties.getPropertyAsBoolean(PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE, DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE));
+
+        // Number of minutes a client can remain idle before the server times the connection out.
+        int connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES, getConnectionIdleTimeoutMinutes());
+        if (connectionIdleTimeoutMinutes != 0)
+        {
+            setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
+        }
+        else
+        {
+            connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES, getConnectionIdleTimeoutMinutes());
+            if (connectionIdleTimeoutMinutes != 0)
+                setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
+        }
+
+        // User-agent configuration for kick-start bytes and max streaming connections per session.
+        UserAgentManager.setupUserAgentManager(properties, userAgentManager);
+
+        // Maximum number of clients allowed to have streaming HTTP connections with the endpoint.
+        maxStreamingClients = properties.getPropertyAsInt(MAX_STREAMING_CLIENTS, DEFAULT_MAX_STREAMING_CLIENTS);
+
+        // Set initial state for the canWait flag based on whether we allow waits or not.
+        canStream = (maxStreamingClients > 0);
+    }
+
+
+    @Override
+    public void start()
+    {
+        if (isStarted())
+            return;
+
+        super.start();
+
+        if (connectionIdleTimeoutMinutes > 0)
+        {
+            pushNotifierTimeoutManager = new TimeoutManager(new ThreadFactory()
+                                                            {
+                                                                int counter = 1;
+                                                                public synchronized Thread newThread(Runnable runnable)
+                                                                {
+                                                                    Thread t = new Thread(runnable);
+                                                                    t.setName(getId() + "-StreamingConnectionTimeoutThread-" + counter++);
+                                                                    return t;
+                                                                }
+                                                            });
+        }
+
+        currentStreamingRequests = new ConcurrentHashMap<String, EndpointPushNotifier>();
+    }
+
+    /**
+     * (non-JavaDoc)
+     * @see flex.messaging.endpoints.AbstractEndpoint#stop()
+     */
+    @Override
+    public void stop()
+    {
+        if (!isStarted())
+            return;
+
+        // Shutdown the timeout manager for streaming connections cleanly.
+        if (pushNotifierTimeoutManager != null)
+        {
+            pushNotifierTimeoutManager.shutdown();
+            pushNotifierTimeoutManager = null;
+        }
+
+        // Shutdown any currently open streaming connections.
+        for (EndpointPushNotifier notifier : currentStreamingRequests.values())
+            notifier.close();
+
+        currentStreamingRequests = null;
+
+        super.stop();
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Used to synchronize sets and gets to the number of streaming clients.
+     */
+    protected final Object lock = new Object();
+
+    /**
+     * Used to keep track of the mapping between user agent match strings and
+     * the bytes needed to kickstart their streaming connections.
+     */
+    protected UserAgentManager userAgentManager = new UserAgentManager();
+
+    /**
+     * This flag is volatile to allow for consistent reads across thread without
+     * needing to pay the cost for a synchronized lock for each read.
+     */
+    private volatile boolean canStream = true;
+
+    /**
+     * Manages timing out EndpointPushNotifier instances.
+     */
+    private volatile TimeoutManager pushNotifierTimeoutManager;
+
+    /**
+     * A Map(EndpointPushNotifier, Boolean.TRUE) containing all currently open streaming notifiers
+     * for this endpoint.
+     * Used for clean shutdown.
+     */
+    private ConcurrentHashMap<String, EndpointPushNotifier> currentStreamingRequests;
+
+    //--------------------------------------------------------------------------
+    //
+    // Properties
+    //
+    //--------------------------------------------------------------------------
+
+    //------------------------------------------
+    //  invalidateMessageClientOnStreamingClose
+    //-----------------------------------------
+
+    private volatile boolean invalidateMessageClientOnStreamingClose = DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE;
+
+    /**
+     * Returns whether invalidate-messageclient-on-streaming-close is enabled.
+     * See {@link BaseStreamingHTTPEndpoint#setInvalidateMessageClientOnStreamingClose(boolean)}
+     * for details.
+     *
+     * @return <code>true</code> if the invalidate-messageclient-on-streaming-close is enabled, <code>false</code> otherwise.
+     */
+    public boolean isInvalidateMessageClientOnStreamingClose()
+    {
+        return invalidateMessageClientOnStreamingClose;
+    }
+
+    /**
+     * Sets the invalidate-messageclient-on-streaming close property. If enabled,
+     * when the streaming connection is closed for whatever reason (for example, the client is gone),
+     * the client's associated MessageClient on the server is invalidated immediately.
+     * This is useful in scenarios where there is a constant stream of messages, the client is gone,
+     * and the streaming connection is closed, but the session has not timed out on the server yet.
+     * In that case, enabling this property will prevent messages accumulating on the session on behalf
+     * of the MessageClient that will invalidate.
+     * <p>
+     * Important: Do not enable this property when reliable messaging is used, otherwise
+     * reliable reconnect attempts will not happen correctly.</p>
+     *
+     * @param value The property value.
+     */
+    public void setInvalidateMessageClientOnStreamingClose(boolean value)
+    {
+        invalidateMessageClientOnStreamingClose = value;
+    }
+
+    //----------------------------------
+    //  serverToClientHeartbeatMillis
+    //----------------------------------
+
+    private long serverToClientHeartbeatMillis = DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS;
+
+    /**
+     * Retrieves the number of milliseconds the server will wait before writing a
+     * single <code>null</code> byte to the streaming connection, to ensure the client is
+     * still available.
+     * @return The server-to-client heartbeat time in milliseconds.
+     */
+    public long getServerToClientHeartbeatMillis()
+    {
+        return serverToClientHeartbeatMillis;
+    }
+
+    /**
+     * Retrieves the number of milliseconds the server will wait before writing a
+     * single <code>null</code> byte to the streaming connection to ensure the client is
+     * still available when there are no new messages for the client.
+     * A non-positive value means the server will wait forever for new messages and
+     * it will not write the <code>null</code> byte to determine if the client is available.
+     * @param serverToClientHeartbeatMillis The server-to-client heartbeat time in milliseconds.
+     */
+    public void setServerToClientHeartbeatMillis(long serverToClientHeartbeatMillis)
+    {
+        if (serverToClientHeartbeatMillis < 0)
+            serverToClientHeartbeatMillis = 0;
+        this.serverToClientHeartbeatMillis = serverToClientHeartbeatMillis;
+    }
+
+    //----------------------------------
+    //  connectionIdleTimeoutMinutes
+    //----------------------------------
+
+    private int connectionIdleTimeoutMinutes = 0;
+
+    /**
+     * Retrieves the number of minutes a client can remain idle before the server
+     * times the connection out. The default value is 0, indicating that connections
+     * will not be timed out and must be closed by the client or server, either explicitly
+     * or by either process terminating.
+     *
+     * @return The number of minutes a client can remain idle before the server
+     * times the connection out.
+     */
+    public int getConnectionIdleTimeoutMinutes()
+    {
+        return connectionIdleTimeoutMinutes;
+    }
+
+    /**
+     * Sets the number of minutes a client can remain idle before the server
+     * times the connection out. A value of 0 or below indicates that
+     * connections will not be timed out.
+     *
+     * @param value The number of minutes a client can remain idle
+     * before the server times the connection out.
+     */
+    public void setConnectionIdleTimeoutMinutes(int value)
+    {
+        if (value < 0)
+            value = 0;
+
+        this.connectionIdleTimeoutMinutes = value;
+    }
+
+    /**
+     * (non-JavaDoc)
+     * @deprecated Use {@link BaseStreamingHTTPEndpoint#getConnectionIdleTimeoutMinutes()} instead.
+     */
+    public int getIdleTimeoutMinutes()
+    {
+        return getConnectionIdleTimeoutMinutes();
+    }
+
+    /**
+     * (non-JavaDoc)
+     * @deprecated Use {@link BaseStreamingHTTPEndpoint#setConnectionIdleTimeoutMinutes(int)} instead.
+     */
+    public void setIdleTimeoutMinutes(int value)
+    {
+        setConnectionIdleTimeoutMinutes(value);
+    }
+
+    //----------------------------------
+    //  maxStreamingClients
+    //----------------------------------
+
+    private int maxStreamingClients = DEFAULT_MAX_STREAMING_CLIENTS;
+
+    /**
+     * Retrieves the maximum number of clients that will be allowed to establish
+     * a streaming HTTP connection with the endpoint.
+     *
+     * @return The maximum number of clients that will be allowed to establish
+     * a streaming HTTP connection with the endpoint.
+     */
+    public int getMaxStreamingClients()
+    {
+        return maxStreamingClients;
+    }
+
+    /**
+     * Sets the maximum number of clients that will be allowed to establish
+     * a streaming HTTP connection with the server.
+     *
+     * @param maxStreamingClients The maximum number of clients that will be allowed
+     * to establish a streaming HTTP connection with the server.
+     */
+    public void setMaxStreamingClients(int maxStreamingClients)
+    {
+        this.maxStreamingClients = maxStreamingClients;
+        canStream = (streamingClientsCount < maxStreamingClients);
+    }
+
+    //----------------------------------
+    //  streamingClientsCount
+    //----------------------------------
+
+    protected int streamingClientsCount;
+
+    /**
+     * Retrieves the the number of clients that are currently in the streaming state.
+     *
+     * @return The number of clients that are currently in the streaming state.
+     */
+    public int getStreamingClientsCount()
+    {
+        return streamingClientsCount;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Handles HTTP requests targetting this endpoint.
+     * Two types or requests are supported. If the request is a regular request-response AMF/AMFX
+     * message it is handled by the base logic in BaseHTTPEndpoint.service. However, if it is a
+     * request to open a streaming HTTP connection to the client this endpoint performs some
+     * validation checks and then holds the connection open to stream data back to the client
+     * over.
+     *
+     * @param req The original servlet request.
+     * @param res The active servlet response.
+     */
+    @Override
+    public void service(HttpServletRequest req, HttpServletResponse res)
+    {
+        String command = req.getParameter(COMMAND_PARAM_NAME);
+        if (command != null)
+            serviceStreamingRequest(req, res);
+        else // Let BaseHTTPEndpoint logic handle regular request-response messaging.
+            super.service(req, res);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Protected Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * If the message has MPI enabled, this method adds all the needed performance
+     * headers to the message.
+     *
+     * @param message Message to add performance headers to.
+     */
+    protected void addPerformanceInfo(Message message)
+    {
+        MessagePerformanceInfo mpiOriginal = getMPI(message);
+        if (mpiOriginal == null)
+            return;
+
+        MessagePerformanceInfo mpip = (MessagePerformanceInfo)mpiOriginal.clone();
+        try
+        {
+            // Set the original message info as the pushed causer info.
+            MessagePerformanceUtils.setMPIP(message, mpip);
+            MessagePerformanceUtils.setMPII(message, null);
+        }
+        catch (Exception e)
+        {
+            if (Log.isDebug())
+                log.debug("MPI exception while streaming the message: " + e.toString());
+        }
+
+        // Overhead only used when MPI is enabled for sizing
+        MessagePerformanceInfo mpio = new MessagePerformanceInfo();
+        if (mpip.recordMessageTimes)
+        {
+            mpio.sendTime = System.currentTimeMillis();
+            mpio.infoType = "OUT";
+        }
+        mpio.pushedFlag = true;
+        MessagePerformanceUtils.setMPIO(message, mpio);
+
+        // If MPI sizing information is enabled serialize again so that we know size
+        if (mpip.recordMessageSizes)
+        {
+            try
+            {
+                // Each subclass serializes the message in their own format to
+                // get the message size for the MPIO.
+                long serializationOverhead = System.currentTimeMillis();
+                mpio.messageSize = getMessageSizeForPerformanceInfo(message);
+
+                // Set serialization overhead to the time calculated during serialization above
+                if (mpip.recordMessageTimes)
+                {
+                    serializationOverhead = System.currentTimeMillis() - serializationOverhead;
+                    mpip.addToOverhead(serializationOverhead);
+                    mpiOriginal.addToOverhead(serializationOverhead);
+                    mpio.sendTime = System.currentTimeMillis();
+                }
+            }
+            catch(Exception e)
+            {
+                log.debug("MPI exception while streaming the message: " + e.toString());
+            }
+        }
+    }
+
+    /**
+     * Utility method to convert streamed push messages to their small versions
+     * if the channel-endpoint combination supports small messages.
+     *
+     * @param message The regular message.
+     * @return The small message if the message has a small version, or regular message
+     * if it doesn't .
+     */
+    protected Message convertPushMessageToSmall(Message message)
+    {
+        FlexSession session = FlexContext.getFlexSession();
+        if (session != null && session.useSmallMessages())
+            return convertToSmallMessage(message);
+        return message;
+    }
+
+    /**
+     * Used internally for performance information gathering; not intended for
+     * public use. The default implementation of this method returns zero.
+     * Subclasses should overwrite if they want to accurately report message
+     * size information in performance information gathering.
+     *
+     * @param message Message to get the size for.
+     *
+     * @return The size of the message after message is serialized.
+     */
+    protected long getMessageSizeForPerformanceInfo(Message message)
+    {
+        return 0;
+    }
+
+    /**
+     * This streaming endpoint does not support polling clients.
+     *
+     * @param flexClient The FlexClient that issued the poll request.
+     * @param pollCommand The poll command from the client.
+     * @return The flush info used to build the poll response.
+     */
+    @Override
+    protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand)
+    {
+        MessageException me = new MessageException();
+        me.setMessage(POLL_NOT_SUPPORTED_MESSAGE);
+        me.setDetails(POLL_NOT_SUPPORTED_MESSAGE);
+        me.setCode(POLL_NOT_SUPPORTED_CODE);
+        throw me;
+    }
+
+    /**
+     * Handles streaming connection open command sent by the FlexClient.
+     *
+     * @param req The <code>HttpServletRequest</code> to service.
+     * @param res The <code>HttpServletResponse</code> to be used in case an error
+     * has to be sent back.
+     * @param flexClient FlexClient that requested the streaming connection.
+     */
+    protected void handleFlexClientStreamingOpenRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient)
+    {
+        FlexSession session = FlexContext.getFlexSession();
+        if (canStream && session.canStream)
+        {
+            // If canStream/session.canStream is true it means we currently have
+            // less than the max number of allowed streaming threads, per endpoint/session.
+
+            // We need to protect writes/reads to the stream count with the endpoint's lock.
+            // Also, we have to be careful to handle the case where two threads get to this point when only
+            // one streaming spot remains; one thread will win and the other needs to fault.
+            boolean thisThreadCanStream;
+            synchronized (lock)
+            {
+                ++streamingClientsCount;
+                if (streamingClientsCount == maxStreamingClients)
+                {
+                    thisThreadCanStream = true; // This thread got the last spot.
+                    canStream = false;
+                }
+                else if (streamingClientsCount > maxStreamingClients)
+                {
+                    thisThreadCanStream = false; // This thread was beaten out for the last spot.
+                    --streamingClientsCount; // Decrement the count because we're not going to grant the streaming right to the client.
+                }
+                else
+                {
+                    // We haven't hit the limit yet, allow this thread to stream.
+                    thisThreadCanStream = true;
+                }
+            }
+
+            // If the thread cannot wait due to endpoint streaming connection
+            // limit, inform the client and return.
+            if (!thisThreadCanStream)
+            {
+                String errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+                        + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '"
+                        + maxStreamingClients + "' has been reached.";
+                if (Log.isError())
+                    log.error(errorMessage);
+                try
+                {
+                    errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+                    + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached.";
+                    res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
+                }
+                catch (IOException ignore)
+                {}
+                return;
+            }
+
+            // Setup for specific user agents.
+            byte[] kickStartBytesToStream = null;
+            String userAgentValue = req.getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
+            UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
+            if (agentSettings != null)
+            {
+                synchronized (session)
+                {
+                    session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
+                }
+
+                int kickStartBytes = agentSettings.getKickstartBytes();
+                if (kickStartBytes > 0)
+                {
+                    // Determine the minimum number of actual bytes that need to be sent to
+                    // kickstart, taking into account transfer-encoding overhead.
+                    try
+                    {
+                        int chunkLengthHeaderSize = Integer.toHexString(kickStartBytes).getBytes("ASCII").length;
+                        int chunkOverhead = chunkLengthHeaderSize + 4; // 4 for the 2 wrapping CRLF tokens.
+                        int minimumKickstartBytes = kickStartBytes - chunkOverhead;
+                        kickStartBytesToStream = new byte[(minimumKickstartBytes > 0) ? minimumKickstartBytes :
+                                kickStartBytes];
+                    }
+                    catch (UnsupportedEncodingException ignore)
+                    {
+                        kickStartBytesToStream = new byte[kickStartBytes];
+                    }
+                    Arrays.fill(kickStartBytesToStream, NULL_BYTE);
+                }
+            }
+
+            // Now, check with the session before granting the streaming connection.
+            synchronized(session)
+            {
+                ++session.streamingConnectionsCount;
+                if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED)
+                {
+                    thisThreadCanStream = true;
+                }
+                else if (session.streamingConnectionsCount == session.maxConnectionsPerSession)
+                {
+                    thisThreadCanStream = true; // This thread got the last spot in the session.
+                    session.canStream = false;
+                }
+                else if (session.streamingConnectionsCount > session.maxConnectionsPerSession)
+                {
+                    thisThreadCanStream = false; // This thread was beaten out for the last spot.
+                    --session.streamingConnectionsCount;
+                    synchronized(lock)
+                    {
+                        // Decrement the endpoint count because we're not going to grant the streaming right to the client.
+                        --streamingClientsCount;
+                    }
+                }
+                else
+                {
+                    // We haven't hit the limit yet, allow this thread to stream.
+                    thisThreadCanStream = true;
+                }
+            }
+
+            // If the thread cannot wait due to session streaming connection
+            // limit, inform the client and return.
+            if (!thisThreadCanStream)
+            {
+                if (Log.isError())
+                    log.error("Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+                            + flexClient.getId() + "' because " + UserAgentManager.MAX_PERSISTENT_CONNECTIONS_PER_SESSION + " limit of '" + session.maxConnectionsPerSession
+                            + ((agentSettings != null) ? "' for user-agent '" + agentSettings.getMatchOn() + "'" : "") +  " has been reached." );
+                try
+                {
+                    // Return an HTTP status code 400.
+                    String errorMessage = "The server cannot grant streaming connection to this client because limit has been reached.";
+                    res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
+                }
+                catch (IOException ignore)
+                {
+                    // NOWARN
+                }
+                return;
+            }
+
+            Thread currentThread = Thread.currentThread();
+            String threadName = currentThread.getName();
+            EndpointPushNotifier notifier = null;
+            boolean suppressIOExceptionLogging = false; // Used to suppress logging for IO exception.
+            try
+            {
+                currentThread.setName(threadName + STREAMING_THREAD_NAME_EXTENSION);
+
+                // Open and commit response headers and get output stream.
+                if (addNoCacheHeaders)
+                    addNoCacheHeaders(req, res);
+                res.setContentType(getResponseContentType());
+                res.setHeader("Connection", "close");
+                res.setHeader("Transfer-Encoding", "chunked");
+                ServletOutputStream os = res.getOutputStream();
+                res.flushBuffer();
+
+                // If kickstart-bytes are specified, stream them.
+                if (kickStartBytesToStream != null)
+                {
+                    if (Log.isDebug())
+                        log.debug("Endpoint with id '

<TRUNCATED>