You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2009/03/18 19:38:55 UTC

svn commit: r755694 - in /httpcomponents/httpcore/branches/ibm_compat_branch: ./ httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/ httpcore-nio/src/main/java/org/apache/http/nio/params/

Author: olegk
Date: Wed Mar 18 18:38:55 2009
New Revision: 755694

URL: http://svn.apache.org/viewvc?rev=755694&view=rev
Log:
HTTPCORE-155: Compatibility mode with IBM JRE and other JREs with naive (broken) implementation of SelectionKey.

Contributed by Marc Beyerle <marc.beyerle at de.ibm.com> 

Added:
    httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java   (with props)
Modified:
    httpcomponents/httpcore/branches/ibm_compat_branch/RELEASE_NOTES.txt
    httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java
    httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
    httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java
    httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java
    httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java
    httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java

Modified: httpcomponents/httpcore/branches/ibm_compat_branch/RELEASE_NOTES.txt
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/ibm_compat_branch/RELEASE_NOTES.txt?rev=755694&r1=755693&r2=755694&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/ibm_compat_branch/RELEASE_NOTES.txt (original)
+++ httpcomponents/httpcore/branches/ibm_compat_branch/RELEASE_NOTES.txt Wed Mar 18 18:38:55 2009
@@ -1,6 +1,10 @@
 Changes since 4.0
 -------------------
 
+* [HTTPCORE-155] Compatibility mode with IBM JRE and other JREs with naive (broken) implementation 
+  of SelectionKey.
+  Cntributed by Marc Beyerle <marc.beyerle at de.ibm.com> 
+
 * [HTTPCORE-191] Blocking HTTP connections are now capable of correctly preserving their internal 
   state on SocketTimeoutExceptions, which makes it possible to continue reading from the connection 
   after a socket timeout.

Modified: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java?rev=755694&r1=755693&r2=755694&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java (original)
+++ httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java Wed Mar 18 18:38:55 2009
@@ -40,9 +40,11 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -68,24 +70,28 @@
     
     private final Object shutdownMutex;
     private final long selectTimeout;
+    private final boolean interestOpsQueueing;
     private final Selector selector;
     private final Set<IOSession> sessions;
+    private final List<InterestOpEntry> interestOpsQueue;
     private final Queue<IOSession> closedSessions;
     private final Queue<ChannelEntry> newChannels;
-    
+
     /**
      * Creates new AbstractIOReactor instance.
      * 
      * @param selectTimeout the select timeout.
      * @throws IOReactorException in case if a non-recoverable I/O error. 
      */
-    public AbstractIOReactor(long selectTimeout) throws IOReactorException {
+    public AbstractIOReactor(long selectTimeout, boolean interestOpsQueueing) throws IOReactorException {
         super();
         if (selectTimeout <= 0) {
             throw new IllegalArgumentException("Select timeout may not be negative or zero");
         }
         this.selectTimeout = selectTimeout;
+        this.interestOpsQueueing = interestOpsQueueing;
         this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
+        this.interestOpsQueue = new ArrayList<InterestOpEntry>();
         this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
         this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
         try {
@@ -189,6 +195,13 @@
     }
 
     /**
+     * Returns <code>true</code> if interest Ops queueing is enabled, <code>false</code> otherwise.
+     */
+    public boolean getInterestOpsQueueing() {
+        return this.interestOpsQueueing;
+    }
+
+    /**
      * Adds new channel entry. The channel will be asynchronously registered
      * with the selector.
      *  
@@ -272,6 +285,11 @@
                         && this.sessions.isEmpty()) {
                     break;
                 }
+
+                if (this.interestOpsQueueing) {
+                    // process all pending interestOps() operations
+                    processPendingInterestOps();
+                }
                 
             }
             
@@ -363,7 +381,7 @@
                     queueClosedSession(session);
                 }
                 
-            });
+            }, this);
             
             int timeout = 0;
             try {
@@ -406,6 +424,40 @@
     }
 
     /**
+        Processes all pending {@link java.nio.channels.SelectionKey#interestOps(int) interestOps(int)}
+        operations.
+    */
+    protected void processPendingInterestOps() {
+        // validity check
+        if (!this.interestOpsQueueing) {
+            return;
+        }
+        synchronized (this.interestOpsQueue) {
+            // determine this interestOps() queue's size
+            int size = this.interestOpsQueue.size();
+
+            for (int i = 0; i < size; i++) {
+                // get the first queue element
+                InterestOpEntry queueElement = this.interestOpsQueue.remove(0);
+
+                // obtain the operation's details
+                IOSessionImpl ioSession = queueElement.getIoSession();
+                int operationType = queueElement.getOperationType();
+                int operationArgument = queueElement.getOperationArgument();
+
+                // perform the operation
+                if (operationType == InterestOpEntry.OPERATION_TYPE_SET_EVENT) {
+                    ioSession.setEventImpl(operationArgument);
+                } else if (operationType == InterestOpEntry.OPERATION_TYPE_CLEAR_EVENT) {
+                    ioSession.clearEventImpl(operationArgument);
+                } else if (operationType == InterestOpEntry.OPERATION_TYPE_SET_EVENT_MASK) {
+                    ioSession.setEventMaskImpl(operationArgument);
+                }
+            }
+        }
+    }
+
+    /**
      * Closes out all I/O sessions maintained by this I/O reactor.
      */
     protected void closeSessions() {
@@ -524,5 +576,44 @@
     public void shutdown() throws IOReactorException {
         shutdown(1000);
     }
+
+    /** 
+     * Adds an {@link org.apache.http.impl.nio.reactor.IOSessionQueueElement IOSessionQueueElement}
+     * to the {@link java.nio.channels.SelectionKey#interestOps(int) interestOps(int)} queue for
+     * this instance. 
+     * @return <code>true</code> if the operation could be performed successfully, 
+     *   <code>false</code> otherwise.
+     */
+    protected boolean addInterestOpsQueueElement(final InterestOpEntry entry) {
+        // validity checks
+        if (!this.interestOpsQueueing) {
+            throw new IllegalStateException("Interest ops queueing not enabled");
+        }
+        if (entry == null) {
+            return false;
+        }
+        if (entry.getIoSession() == null) {
+            return false;
+        }
+
+        // local variable
+        int operationType = entry.getOperationType();
+
+        /*
+            NOTE: Most of the operations are setEvent(), so check for this one first.
+        */
+        if ((operationType != InterestOpEntry.OPERATION_TYPE_SET_EVENT) &&
+            (operationType != InterestOpEntry.OPERATION_TYPE_CLEAR_EVENT) &&
+            (operationType != InterestOpEntry.OPERATION_TYPE_SET_EVENT_MASK)) {
+            return false;
+        }
+
+        synchronized (this.interestOpsQueue) {
+            // add this operation to the interestOps() queue
+            this.interestOpsQueue.add(entry);
+        }
+
+        return true;
+    }
     
 }

Modified: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java?rev=755694&r1=755693&r2=755694&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java (original)
+++ httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java Wed Mar 18 18:38:55 2009
@@ -108,6 +108,7 @@
     protected final HttpParams params;
     protected final Selector selector;
     protected final long selectTimeout;
+    protected final boolean interestOpsQueueing;
 
     private final int workerCount;
     private final ThreadFactory threadFactory;
@@ -148,6 +149,7 @@
         }
         this.params = params;
         this.selectTimeout = NIOReactorParams.getSelectInterval(params);
+        this.interestOpsQueueing = NIOReactorParams.getInterestOpsQueueing(params);
         this.statusLock = new Object();
         this.workerCount = workerCount;
         if (threadFactory != null) {
@@ -282,7 +284,7 @@
             this.status = IOReactorStatus.ACTIVE;
             // Start I/O dispatchers
             for (int i = 0; i < this.dispatchers.length; i++) {
-                BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout);
+                BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
                 dispatcher.setExceptionHandler(exceptionHandler);
                 this.dispatchers[i] = dispatcher;
             }

Modified: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java?rev=755694&r1=755693&r2=755694&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java (original)
+++ httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/BaseIOReactor.java Wed Mar 18 18:38:55 2009
@@ -72,8 +72,8 @@
      * @param selectTimeout the select timeout.
      * @throws IOReactorException in case if a non-recoverable I/O error. 
      */
-    public BaseIOReactor(long selectTimeout) throws IOReactorException {
-        super(selectTimeout);
+    public BaseIOReactor(long selectTimeout, boolean interestOpsQueueing) throws IOReactorException {
+        super(selectTimeout, interestOpsQueueing);
         this.bufferingSessions = new HashSet<IOSession>();
         this.timeoutCheckInterval = selectTimeout;
         this.lastTimeoutCheck = System.currentTimeMillis();

Modified: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java?rev=755694&r1=755693&r2=755694&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java (original)
+++ httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java Wed Mar 18 18:38:55 2009
@@ -60,19 +60,29 @@
     private final ByteChannel channel;
     private final SessionClosedCallback callback;
     private final Map<String, Object> attributes;
+    private final AbstractIOReactor abstractIOReactor;
+    private final boolean interestOpsQueueing;
     
     private SessionBufferStatus bufferStatus;
     private int socketTimeout;
     
-    public IOSessionImpl(final SelectionKey key, final SessionClosedCallback callback) {
+    public IOSessionImpl(final SelectionKey key, final SessionClosedCallback callback, final AbstractIOReactor abstractIOReactor) {
         super();
         if (key == null) {
             throw new IllegalArgumentException("Selection key may not be null");
         }
+
+        // validity check
+        if (abstractIOReactor == null) {
+            throw new IllegalArgumentException("IO reactor may not be null");
+        }
+
         this.key = key;
         this.channel = (ByteChannel) this.key.channel();
         this.callback = callback;
         this.attributes = Collections.synchronizedMap(new HashMap<String, Object>());
+        this.abstractIOReactor = abstractIOReactor;
+        this.interestOpsQueueing = abstractIOReactor.getInterestOpsQueueing();
         this.socketTimeout = 0;
         this.status = ACTIVE;
     }
@@ -100,10 +110,32 @@
     }
 
     public int getEventMask() {
+        if (interestOpsQueueing) {
+            // flush the interestOps() queue
+            abstractIOReactor.processPendingInterestOps();
+        }
+
         return this.key.interestOps();
     }
     
     public void setEventMask(int ops) {
+        if (this.interestOpsQueueing) {
+            // local variable
+            InterestOpEntry entry = new InterestOpEntry(
+                    this, InterestOpEntry.OPERATION_TYPE_SET_EVENT_MASK, ops);
+
+            // add this operation to the interestOps() queue
+            this.abstractIOReactor.addInterestOpsQueueElement(entry);
+
+            // wake up this key's selector
+            this.key.selector().wakeup();
+        } else {
+            // simply invoke the actual implementation
+            setEventMaskImpl(ops);
+        }
+    }
+
+    protected void setEventMaskImpl(int ops) {
         if (this.status == CLOSED) {
             return;
         }
@@ -112,6 +144,23 @@
     }
     
     public void setEvent(int op) {
+        if (this.interestOpsQueueing) {
+            // local variable
+            InterestOpEntry entry = new InterestOpEntry(this, 
+                    InterestOpEntry.OPERATION_TYPE_SET_EVENT, op);
+
+            // add this operation to the interestOps() queue
+            this.abstractIOReactor.addInterestOpsQueueElement(entry);
+
+            // wake up this key's selector
+            this.key.selector().wakeup();
+        } else {
+            // simply invoke the actual implementation
+            setEventImpl(op);
+        }
+    }
+
+    protected void setEventImpl(int op) {
         if (this.status == CLOSED) {
             return;
         }
@@ -123,6 +172,23 @@
     }
     
     public void clearEvent(int op) {
+        if (this.interestOpsQueueing) {
+            // local variable
+            InterestOpEntry entry = new InterestOpEntry(this, 
+                    InterestOpEntry.OPERATION_TYPE_CLEAR_EVENT, op);
+
+            // add this operation to the interestOps() queue
+            this.abstractIOReactor.addInterestOpsQueueElement(entry);
+
+            // wake up this key's selector
+            this.key.selector().wakeup();
+        } else {
+            // simply invoke the actual implementation
+            clearEventImpl(op);
+        }
+    }
+
+    protected void clearEventImpl(int op) {
         if (this.status == CLOSED) {
             return;
         }
@@ -222,6 +288,12 @@
         buffer.append("[");
         if (this.key.isValid()) {
             buffer.append("interested ops: ");
+
+            if (interestOpsQueueing) {
+                // flush the interestOps() queue
+                abstractIOReactor.processPendingInterestOps();
+            }
+
             formatOps(buffer, this.key.interestOps());
             buffer.append("; ready ops: ");
             formatOps(buffer, this.key.readyOps());

Added: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java?rev=755694&view=auto
==============================================================================
--- httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java (added)
+++ httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java Wed Mar 18 18:38:55 2009
@@ -0,0 +1,69 @@
+/*
+ * $HeadURL$
+ * $Revision$
+ * $Date$
+ *
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.impl.nio.reactor;
+
+/**
+ * Helper class, representing an element on an {@link java.nio.channels.SelectionKey#interestOps(int) 
+ * interestOps(int)} queue.
+ */
+class InterestOpEntry {
+
+    public static final int OPERATION_TYPE_SET_EVENT_MASK = 0;
+
+    public static final int OPERATION_TYPE_SET_EVENT = 1;
+
+    public static final int OPERATION_TYPE_CLEAR_EVENT = 2;
+
+    private final IOSessionImpl ioSession;
+    private final int operationType;
+    private final int operationArgument;
+
+    public InterestOpEntry(IOSessionImpl ioSession, int operationType, int operationArgument) {
+        // initialize instance members
+        this.ioSession = ioSession;
+        this.operationType = operationType;
+        this.operationArgument = operationArgument;
+    }
+
+    public IOSessionImpl getIoSession() {
+        return ioSession;
+    }
+
+    public int getOperationType() {
+        return operationType;
+    }
+
+    public int getOperationArgument() {
+        return operationArgument;
+    }
+
+}

Propchange: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/InterestOpEntry.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java?rev=755694&r1=755693&r2=755694&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java (original)
+++ httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java Wed Mar 18 18:38:55 2009
@@ -68,4 +68,12 @@
      */
     public static final String GRACE_PERIOD = "http.nio.grace-period"; 
 
+    /**
+     * Determines whether interestOps() queueing is enabled for the I/O reactors.
+     * <p>
+     * This parameter expects a value of type {@link Boolean}.
+     * </p>
+     */
+    public static final String INTEREST_OPS_QUEUEING = "http.nio.interest-ops-queueing";
+
 }

Modified: httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java?rev=755694&r1=755693&r2=755694&view=diff
==============================================================================
--- httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java (original)
+++ httpcomponents/httpcore/branches/ibm_compat_branch/httpcore-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java Wed Mar 18 18:38:55 2009
@@ -91,4 +91,18 @@
         params.setLongParameter(GRACE_PERIOD, ms);
     }
 
+    public static boolean getInterestOpsQueueing(final HttpParams params) {
+        if (params == null) {
+            throw new IllegalArgumentException("HTTP parameters may not be null");
+        }
+        return params.getBooleanParameter(INTEREST_OPS_QUEUEING, false);
+    }
+
+    public static void setInterestOpsQueueing(final HttpParams params, boolean interestOpsQueueing) {
+        if (params == null) {
+            throw new IllegalArgumentException("HTTP parameters may not be null");
+        }
+        params.setBooleanParameter(INTEREST_OPS_QUEUEING, interestOpsQueueing);
+    }
+
 }