You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2009/03/26 23:43:27 UTC

svn commit: r758905 - in /httpcomponents/httpcore/trunk: ./ httpcore-nio/ httpcore-nio/src/examples/org/apache/http/examples/nio/ httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/ httpcore-nio/src/main/java/org/apache/http/nio/params/ httpco...

Author: olegk
Date: Thu Mar 26 22:43:26 2009
New Revision: 758905

URL: http://svn.apache.org/viewvc?rev=758905&view=rev
Log:
HTTPCORE-155: Compatibility mode with IBM JRE and other JREs with naive (broken) implementation of SelectionKey. 

* merged down from the branch

Added:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java
      - copied unchanged from r758898, httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpsCallback.java
      - copied unchanged from r758898, httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpsCallback.java
Modified:
    httpcomponents/httpcore/trunk/   (props changed)
    httpcomponents/httpcore/trunk/RELEASE_NOTES.txt
    httpcomponents/httpcore/trunk/httpcore/   (props changed)
    httpcomponents/httpcore/trunk/httpcore-nio/   (props changed)
    httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpClientConnManagement.java   (props changed)
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java
    httpcomponents/httpcore/trunk/httpcore-osgi/   (props changed)

Propchange: httpcomponents/httpcore/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 26 22:43:26 2009
@@ -0,0 +1 @@
+/httpcomponents/httpcore/branches/ibm_compat_branch:755687-758898

Modified: httpcomponents/httpcore/trunk/RELEASE_NOTES.txt
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/RELEASE_NOTES.txt?rev=758905&r1=758904&r2=758905&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/RELEASE_NOTES.txt (original)
+++ httpcomponents/httpcore/trunk/RELEASE_NOTES.txt Thu Mar 26 22:43:26 2009
@@ -1,9 +1,12 @@
 Changes since 4.0
 -------------------
 
+* [HTTPCORE-155] Compatibility mode with IBM JRE and other JREs with naive (broken) implementation 
+  of SelectionKey.
+  Cotributed by Marc Beyerle <marc.beyerle at de.ibm.com> and Oleg Kalnichevski <olegk at apache.org> 
+
 * [HTTPCORE-193] Fixed problem with SSLIOSession incorrectly handling of end-of-stream condition.  
-  Contributed by Asankha C. Perera <asankha at apache.org> and 
-  Oleg Kalnichevski <olegk at apache.org> 
+  Contributed by Asankha C. Perera <asankha at apache.org> and Oleg Kalnichevski <olegk at apache.org> 
 
 * [HTTPCORE-191] Blocking HTTP connections are now capable of correctly preserving their internal 
   state on SocketTimeoutExceptions, which makes it possible to continue reading from the connection 

Propchange: httpcomponents/httpcore/trunk/httpcore/
            ('svn:mergeinfo' removed)

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/
            ('svn:mergeinfo' removed)

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpClientConnManagement.java
            ('svn:mergeinfo' removed)

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java?rev=758905&r1=758904&r2=758905&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java Thu Mar 26 22:43:26 2009
@@ -68,24 +68,28 @@
     
     private final Object shutdownMutex;
     private final long selectTimeout;
+    private final boolean interestOpsQueueing;
     private final Selector selector;
     private final Set<IOSession> sessions;
+    private final Queue<InterestOpEntry> interestOpsQueue;
     private final Queue<IOSession> closedSessions;
     private final Queue<ChannelEntry> newChannels;
-    
+
     /**
      * Creates new AbstractIOReactor instance.
      * 
      * @param selectTimeout the select timeout.
      * @throws IOReactorException in case if a non-recoverable I/O error. 
      */
-    public AbstractIOReactor(long selectTimeout) throws IOReactorException {
+    public AbstractIOReactor(long selectTimeout, boolean interestOpsQueueing) throws IOReactorException {
         super();
         if (selectTimeout <= 0) {
             throw new IllegalArgumentException("Select timeout may not be negative or zero");
         }
         this.selectTimeout = selectTimeout;
+        this.interestOpsQueueing = interestOpsQueueing;
         this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
+        this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
         this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
         this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
         try {
@@ -189,6 +193,13 @@
     }
 
     /**
+     * Returns <code>true</code> if interest Ops queueing is enabled, <code>false</code> otherwise.
+     */
+    public boolean getInterestOpsQueueing() {
+        return this.interestOpsQueueing;
+    }
+
+    /**
      * Adds new channel entry. The channel will be asynchronously registered
      * with the selector.
      *  
@@ -272,6 +283,11 @@
                         && this.sessions.isEmpty()) {
                     break;
                 }
+
+                if (this.interestOpsQueueing) {
+                    // process all pending interestOps() operations
+                    processPendingInterestOps();
+                }
                 
             }
             
@@ -357,13 +373,26 @@
                         "with the selector", ex);
             }
 
-            IOSession session = new IOSessionImpl(key, new SessionClosedCallback() {
+            SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
 
                 public void sessionClosed(IOSession session) {
                     queueClosedSession(session);
                 }
                 
-            });
+            };
+            
+            InterestOpsCallback interestOpsCallback = null;
+            if (this.interestOpsQueueing) {
+                interestOpsCallback = new InterestOpsCallback() {
+
+                    public void addInterestOps(InterestOpEntry entry) {
+                        queueInterestOps(entry);
+                    }
+                    
+                };
+            }
+            
+            IOSession session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
             
             int timeout = 0;
             try {
@@ -406,6 +435,30 @@
     }
 
     /**
+        Processes all pending {@link java.nio.channels.SelectionKey#interestOps(int) interestOps(int)}
+        operations.
+    */
+    private void processPendingInterestOps() {
+        // validity check
+        if (!this.interestOpsQueueing) {
+            return;
+        }
+        synchronized (this.interestOpsQueue) {
+            while (!this.interestOpsQueue.isEmpty()) {
+                // get the first queue element
+                InterestOpEntry entry = this.interestOpsQueue.remove();
+
+                // obtain the operation's details
+                SelectionKey key = entry.getSelectionKey();
+                int eventMask = entry.getEventMask();
+                if (key.isValid()) {
+                    key.interestOps(eventMask);
+                }
+            }
+        }
+    }
+
+    /**
      * Closes out all I/O sessions maintained by this I/O reactor.
      */
     protected void closeSessions() {
@@ -524,5 +577,29 @@
     public void shutdown() throws IOReactorException {
         shutdown(1000);
     }
+
+    /** 
+     * Adds an {@link org.apache.http.impl.nio.reactor.IOSessionQueueElement IOSessionQueueElement}
+     * to the {@link java.nio.channels.SelectionKey#interestOps(int) interestOps(int)} queue for
+     * this instance. 
+     * @return <code>true</code> if the operation could be performed successfully, 
+     *   <code>false</code> otherwise.
+     */
+    protected boolean queueInterestOps(final InterestOpEntry entry) {
+        // validity checks
+        if (!this.interestOpsQueueing) {
+            throw new IllegalStateException("Interest ops queueing not enabled");
+        }
+        if (entry == null) {
+            return false;
+        }
+
+        synchronized (this.interestOpsQueue) {
+            // add this operation to the interestOps() queue
+            this.interestOpsQueue.add(entry);
+        }
+
+        return true;
+    }
     
 }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java?rev=758905&r1=758904&r2=758905&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java Thu Mar 26 22:43:26 2009
@@ -108,6 +108,7 @@
     protected final HttpParams params;
     protected final Selector selector;
     protected final long selectTimeout;
+    protected final boolean interestOpsQueueing;
 
     private final int workerCount;
     private final ThreadFactory threadFactory;
@@ -148,6 +149,7 @@
         }
         this.params = params;
         this.selectTimeout = NIOReactorParams.getSelectInterval(params);
+        this.interestOpsQueueing = NIOReactorParams.getInterestOpsQueueing(params);
         this.statusLock = new Object();
         this.workerCount = workerCount;
         if (threadFactory != null) {
@@ -282,7 +284,7 @@
             this.status = IOReactorStatus.ACTIVE;
             // Start I/O dispatchers
             for (int i = 0; i < this.dispatchers.length; i++) {
-                BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout);
+                BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
                 dispatcher.setExceptionHandler(exceptionHandler);
                 this.dispatchers[i] = dispatcher;
             }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java?rev=758905&r1=758904&r2=758905&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java Thu Mar 26 22:43:26 2009
@@ -72,8 +72,8 @@
      * @param selectTimeout the select timeout.
      * @throws IOReactorException in case if a non-recoverable I/O error. 
      */
-    public BaseIOReactor(long selectTimeout) throws IOReactorException {
-        super(selectTimeout);
+    public BaseIOReactor(long selectTimeout, boolean interestOpsQueueing) throws IOReactorException {
+        super(selectTimeout, interestOpsQueueing);
         this.bufferingSessions = new HashSet<IOSession>();
         this.timeoutCheckInterval = selectTimeout;
         this.lastTimeoutCheck = System.currentTimeMillis();

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java?rev=758905&r1=758904&r2=758905&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java Thu Mar 26 22:43:26 2009
@@ -58,25 +58,38 @@
     
     private final SelectionKey key;
     private final ByteChannel channel;
-    private final SessionClosedCallback callback;
     private final Map<String, Object> attributes;
+    private final InterestOpsCallback interestOpsCallback;
+    private final SessionClosedCallback sessionClosedCallback;
     
     private SessionBufferStatus bufferStatus;
     private int socketTimeout;
+    private volatile int currentEventMask;
     
-    public IOSessionImpl(final SelectionKey key, final SessionClosedCallback callback) {
+    public IOSessionImpl(
+            final SelectionKey key, 
+            final InterestOpsCallback interestOpsCallback,
+            final SessionClosedCallback sessionClosedCallback) {
         super();
         if (key == null) {
             throw new IllegalArgumentException("Selection key may not be null");
         }
         this.key = key;
         this.channel = (ByteChannel) this.key.channel();
-        this.callback = callback;
+        this.interestOpsCallback = interestOpsCallback;
+        this.sessionClosedCallback = sessionClosedCallback;
         this.attributes = Collections.synchronizedMap(new HashMap<String, Object>());
+        this.currentEventMask = 0;
         this.socketTimeout = 0;
         this.status = ACTIVE;
     }
     
+    public IOSessionImpl(
+            final SelectionKey key, 
+            final SessionClosedCallback sessionClosedCallback) {
+        this(key, null, sessionClosedCallback);
+    }
+    
     public ByteChannel channel() {
         return this.channel;
     }
@@ -100,39 +113,72 @@
     }
 
     public int getEventMask() {
-        return this.key.interestOps();
+        return this.interestOpsCallback != null ? this.currentEventMask : this.key.interestOps();
     }
     
     public void setEventMask(int ops) {
         if (this.status == CLOSED) {
             return;
         }
-        this.key.interestOps(ops);
+        if (this.interestOpsCallback != null) {
+            // update the current event mask
+            this.currentEventMask = ops;
+
+            // local variable
+            InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
+
+            // add this operation to the interestOps() queue
+            this.interestOpsCallback.addInterestOps(entry);
+        } else {
+            this.key.interestOps(ops);
+        }
         this.key.selector().wakeup();
     }
-    
+
     public void setEvent(int op) {
         if (this.status == CLOSED) {
             return;
         }
-        synchronized (this.key) {
-            int ops = this.key.interestOps();
-            this.key.interestOps(ops | op);
+        if (this.interestOpsCallback != null) {
+            // update the current event mask
+            this.currentEventMask |= op;
+
+            // local variable
+            InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
+
+            // add this operation to the interestOps() queue
+            this.interestOpsCallback.addInterestOps(entry);
+        } else {
+            synchronized (this.key) {
+                int ops = this.key.interestOps();
+                this.key.interestOps(ops | op);
+            }
         }
         this.key.selector().wakeup();
     }
-    
+
     public void clearEvent(int op) {
         if (this.status == CLOSED) {
             return;
         }
-        synchronized (this.key) {
-            int ops = this.key.interestOps();
-            this.key.interestOps(ops & ~op);
+        if (this.interestOpsCallback != null) {
+            // update the current event mask
+            this.currentEventMask &= ~op;
+
+            // local variable
+            InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
+
+            // add this operation to the interestOps() queue
+            this.interestOpsCallback.addInterestOps(entry);
+        } else {
+            synchronized (this.key) {
+                int ops = this.key.interestOps();
+                this.key.interestOps(ops & ~op);
+            }
         }
         this.key.selector().wakeup();
     }
-    
+
     public int getSocketTimeout() {
         return this.socketTimeout;
     }
@@ -153,8 +199,8 @@
             // Munching exceptions is not nice
             // but in this case it is justified
         }
-        if (this.callback != null) {
-            this.callback.sessionClosed(this);
+        if (this.sessionClosedCallback != null) {
+            this.sessionClosedCallback.sessionClosed(this);
         }
         if (this.key.selector().isOpen()) {
             this.key.selector().wakeup();
@@ -222,7 +268,8 @@
         buffer.append("[");
         if (this.key.isValid()) {
             buffer.append("interested ops: ");
-            formatOps(buffer, this.key.interestOps());
+            formatOps(buffer, this.interestOpsCallback != null ? 
+                    this.currentEventMask : this.key.interestOps());
             buffer.append("; ready ops: ");
             formatOps(buffer, this.key.readyOps());
         } else {

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java?rev=758905&r1=758904&r2=758905&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java Thu Mar 26 22:43:26 2009
@@ -68,4 +68,12 @@
      */
     public static final String GRACE_PERIOD = "http.nio.grace-period"; 
 
+    /**
+     * Determines whether interestOps() queueing is enabled for the I/O reactors.
+     * <p>
+     * This parameter expects a value of type {@link Boolean}.
+     * </p>
+     */
+    public static final String INTEREST_OPS_QUEUEING = "http.nio.interest-ops-queueing";
+
 }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java?rev=758905&r1=758904&r2=758905&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java Thu Mar 26 22:43:26 2009
@@ -91,4 +91,18 @@
         params.setLongParameter(GRACE_PERIOD, ms);
     }
 
+    public static boolean getInterestOpsQueueing(final HttpParams params) {
+        if (params == null) {
+            throw new IllegalArgumentException("HTTP parameters may not be null");
+        }
+        return params.getBooleanParameter(INTEREST_OPS_QUEUEING, false);
+    }
+
+    public static void setInterestOpsQueueing(final HttpParams params, boolean interestOpsQueueing) {
+        if (params == null) {
+            throw new IllegalArgumentException("HTTP parameters may not be null");
+        }
+        params.setBooleanParameter(INTEREST_OPS_QUEUEING, interestOpsQueueing);
+    }
+
 }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java?rev=758905&r1=758904&r2=758905&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java Thu Mar 26 22:43:26 2009
@@ -58,8 +58,6 @@
 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
 import org.apache.http.nio.reactor.IOReactorStatus;
 import org.apache.http.nio.reactor.ListenerEndpoint;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpParams;
 import org.apache.http.protocol.BasicHttpProcessor;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.protocol.HttpRequestHandler;

Propchange: httpcomponents/httpcore/trunk/httpcore-osgi/
            ('svn:mergeinfo' removed)