You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by mt...@apache.org on 2011/06/24 14:25:18 UTC

svn commit: r1139267 - in /commons/sandbox/runtime/trunk/src/main: java/org/apache/commons/runtime/net/ native/include/acr/ native/os/linux/ native/os/unix/

Author: mturk
Date: Fri Jun 24 12:25:18 2011
New Revision: 1139267

URL: http://svn.apache.org/viewvc?rev=1139267&view=rev
Log:
Add java part of unix selector

Added:
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java   (with props)
Modified:
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalSelectorFactory.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketSelectorFactory.java
    commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h
    commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c
    commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h
    commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c

Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalSelectorFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalSelectorFactory.java?rev=1139267&r1=1139266&r2=1139267&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalSelectorFactory.java (original)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalSelectorFactory.java Fri Jun 24 12:25:18 2011
@@ -60,6 +60,8 @@ final class LocalSelectorFactory
         switch (type) {
             case 1:
                 return new PollSelector(size);
+            case 2:
+                return new UnixSelector(size);
             default:
                 throw new RuntimeException(Local.sm.get("selector.ETYPE"));
         }

Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketSelectorFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketSelectorFactory.java?rev=1139267&r1=1139266&r2=1139267&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketSelectorFactory.java (original)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketSelectorFactory.java Fri Jun 24 12:25:18 2011
@@ -61,9 +61,8 @@ final class SocketSelectorFactory
             case 0:     // Posix select
             case 1:     // Posix poll
                 return new PollSelector(size);
-            case 2:     // Linux epoll
-            case 3:     // Solaris /dev/poll
-            case 4:     // BSD Kqueue
+            case 2:     // Platform optimal selector
+                return new UnixSelector(size);
             default:
                 throw new RuntimeException(Local.sm.get("selector.ETYPE"));
         }

Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java?rev=1139267&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java (added)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java Fri Jun 24 12:25:18 2011
@@ -0,0 +1,189 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.runtime.net;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashSet;
+import org.apache.commons.runtime.io.ClosedDescriptorException;
+import org.apache.commons.runtime.io.Descriptor;
+import org.apache.commons.runtime.AlreadyExistsException;
+import org.apache.commons.runtime.InvalidArgumentException;
+import org.apache.commons.runtime.NoSuchObjectException;
+import org.apache.commons.runtime.OperationNotImplementedException;
+import org.apache.commons.runtime.OverflowException;
+import org.apache.commons.runtime.SystemException;
+import org.apache.commons.runtime.Errno;
+import org.apache.commons.runtime.Status;
+
+/**
+ * Unix socket selector implementation class.
+ * UnixSelector uses platform optimal poll function for waiting on I/O events.
+ *
+ * @since Runtime 1.0
+ */
+final class UnixSelector extends AbstractSelector
+{
+
+    private short[]                     revents;
+    private SelectionKeyImpl[]          keyset;
+    private long                        pollset;
+    private ArrayList<SelectionKey>     selected;
+
+    private static native long  create0(int size)
+        throws OutOfMemoryError,
+               SystemException;
+    private static native void  wakeup0(long pollset);
+    private static native int   destroy0(long pollset);
+    private static native int   size0(long pollset);
+    private static native int   add0(long pollset, SelectionKeyImpl key, long fd, int events, int ttl);
+    private static native int   del0(long pollset, SelectionKeyImpl key, long fd);
+    private static native int   clr0(long pollset, SelectionKeyImpl[] set);
+    private static native int   wait0(long pollset, SelectionKeyImpl[] set, short[] events,
+                                      int timeout, boolean autocancel);
+
+    /*
+     * Created from native
+     */
+    public UnixSelector(int size)
+    {
+        super(size);
+        pollset  = create0(size);
+        revents  = new short[size];
+        keyset   = new SelectionKeyImpl[size];
+        selected = new ArrayList<SelectionKey>();
+    }
+
+    private void ensureValid()
+        throws ClosedSelectorException
+    {
+        if (pollset == 0L)
+            throw new ClosedSelectorException();
+    }
+
+    @Override
+    public int size()
+        throws ClosedSelectorException
+    {
+        ensureValid();
+        return size0(pollset);
+    }
+
+    @Override
+    public void interrupt()
+        throws ClosedSelectorException
+    {
+        ensureValid();
+        wakeup0(pollset);
+    }
+
+    @Override
+    public SelectionKey queue(SelectionKeyImpl key, Descriptor sd)
+        throws ClosedSelectorException,
+               OverflowException,
+               IOException
+    {
+        ensureValid();
+        key.selected = true;
+        int rc = add0(pollset, key, sd.fd(), key.ievents, key.timeout());
+        if (rc != 0) {
+            // Add failed
+            key.selected = false;
+            if (rc != Errno.EALREADY) {
+                if (rc == Errno.EOVERFLOW)
+                    throw new OverflowException();
+                else
+                    throw new IOException(Status.describe(rc));
+            }
+        }
+        return key;
+    }
+
+    @Override
+    public List<SelectionKey> select(int timeout)
+        throws ClosedSelectorException,
+               IOException
+    {
+        ensureValid();
+        selected.clear();
+        int ns = wait0(pollset, keyset, revents, timeout, autoCancel);
+        selected.ensureCapacity(ns);
+        for (int i = 0; i < ns; i++) {
+            selected.add(keyset[i]);
+            keyset[i].revents  = revents[i];
+            keyset[i].selected = false;
+        }
+        return selected;
+    }
+
+    @Override
+    public List<SelectionKey> clear()
+        throws ClosedSelectorException,
+               IOException
+    {
+        ensureValid();
+        selected.clear();
+        int ns = clr0(pollset, keyset);
+        selected.ensureCapacity(ns);
+        for (int i = 0; i < ns; i++) {
+            selected.add(keyset[i]);
+            keyset[i].revents  = 0;
+            keyset[i].selected = false;
+        }
+        return selected;
+    }
+
+    @Override
+    protected void cancel(SelectionKey key)
+        throws ClosedSelectorException,
+               IllegalSelectorException
+    {
+        ensureValid();
+        SelectionKeyImpl skey = (SelectionKeyImpl)key;
+        if (skey.selector != this)
+            throw new IllegalSelectorException();
+        Descriptor sd = skey.endpoint.descriptor();
+        if (sd.valid()) {
+            // Remove the given selection key
+            del0(pollset, skey, sd.fd());
+        }
+        skey.revents  = 0;
+        skey.selected = false;
+    }
+
+    @Override
+    public boolean isOpen()
+    {
+        return pollset != 0L;
+    }
+
+    @Override
+    public void close()
+        throws IOException
+    {
+        if (pollset != 0L) {
+            long ps = pollset;
+            pollset = 0L;
+            int rc  = destroy0(ps);
+            if (rc != 0)
+                throw new IOException(Status.describe(rc));
+        }
+    }
+
+}
+

Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h?rev=1139267&r1=1139266&r2=1139267&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h Fri Jun 24 12:25:18 2011
@@ -63,9 +63,9 @@
 /** Selector implementation types */
 #define ACR_PS_TYPE_SELECT         0
 #define ACR_PS_TYPE_POLL           1
-#define ACR_PS_TYPE_EPOLL          2
-#define ACR_PS_TYPE_DEVPOLL        3
-#define ACR_PS_TYPE_KQUEUE         4
+#define ACR_PS_TYPE_UNIX           2
+#define ACR_PS_TYPE_WINDOWS        3
+#define ACR_PS_TYPE_WINPIPE        4
 
 
 #endif /* _ACR_IODEFS_H */

Modified: commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c?rev=1139267&r1=1139266&r2=1139267&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c Fri Jun 24 12:25:18 2011
@@ -59,7 +59,6 @@ typedef struct acr_pollset_t {
      */
     ACR_RING_HEAD(pfd_dead_ring_t, pfd_elem_t) dead_ring;
     struct epoll_event *epset;
-    acr_pollfd_t       *ooset;
     int                 epfd;
     int                 used;
     int                 size;
@@ -105,7 +104,7 @@ static short reventt(short event)
     return rv;
 }
 
-ACR_NET_EXPORT(jlong, LinuxSelector, create0)(JNI_STDARGS, jint size)
+ACR_NET_EXPORT(jlong, UnixSelector, create0)(JNI_STDARGS, jint size)
 {
     int rc;
     acr_pollset_t *ps;
@@ -168,7 +167,7 @@ ACR_NET_EXPORT(jlong, LinuxSelector, cre
     epipe.data.ptr = pe;
     epipe.events   = EPOLLIN;
     ACR_RING_INSERT_TAIL(&ps->eset_ring, pe, pfd_elem_t, link);
-    if (epoll_ctl(ps->epfd, EPOLL_CTL_ADD, ps->ooset[0].fd, &epipe) == -1) {
+    if (epoll_ctl(ps->epfd, EPOLL_CTL_ADD, pe->pfd.fd, &epipe) == -1) {
         /* Failed adding pipe to the pollset
          */
         ACR_THROW_NET_ERRNO();
@@ -195,11 +194,12 @@ cleanup:
     return 0;
 }
 
-ACR_NET_EXPORT(jint, LinuxSelector, clr0)(JNI_STDARGS, jlong pollset,
+ACR_NET_EXPORT(jint, UnixSelector, clr0)(JNI_STDARGS, jlong pollset,
                                           jobjectArray rs)
 {
     int cnt = 0;
-    pfd_elem_t    *pe;
+    struct epoll_event eevent = { 0 };
+    pfd_elem_t *np, *pe;
     acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
 
     pthread_mutex_lock(&ps->mutex);
@@ -224,10 +224,9 @@ ACR_NET_EXPORT(jint, LinuxSelector, clr0
             return 0;
         }
     }
-    ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+    ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) {
         if (ps->wpipe[0] != pe->pfd.fd) {
-            /* Duplicate descriptor
-             */
+            epoll_ctl(ps->epfd, EPOLL_CTL_DEL, pe->pfd.fd, &eevent);
             (*env)->SetObjectArrayElement(env, rs, cnt++, pe->pfd.obj);
             /* Unref the container. */
             (*env)->DeleteGlobalRef(env, pe->pfd.obj);
@@ -241,7 +240,7 @@ ACR_NET_EXPORT(jint, LinuxSelector, clr0
     return cnt;
 }
 
-ACR_NET_EXPORT(void, LinuxSelector, wakeup0)(JNI_STDARGS, jlong pollset)
+ACR_NET_EXPORT(void, UnixSelector, wakeup0)(JNI_STDARGS, jlong pollset)
 {
     acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
 
@@ -254,7 +253,7 @@ ACR_NET_EXPORT(void, LinuxSelector, wake
     pthread_mutex_unlock(&ps->mutex);
 }
 
-ACR_NET_EXPORT(jint, LinuxSelector, size0)(JNI_STDARGS, jlong pollset)
+ACR_NET_EXPORT(jint, UnixSelector, size0)(JNI_STDARGS, jlong pollset)
 {
     int rv;
     acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
@@ -265,7 +264,7 @@ ACR_NET_EXPORT(jint, LinuxSelector, size
     return rv;
 }
 
-ACR_NET_EXPORT(jint, LinuxSelector, add0)(JNI_STDARGS, jlong pollset, jobject fo,
+ACR_NET_EXPORT(jint, UnixSelector, add0)(JNI_STDARGS, jlong pollset, jobject fo,
                                           jlong fp, jint events, jint ttlms)
 {
     int rc = 0;
@@ -342,7 +341,7 @@ cleanup:
     return rc;
 }
 
-ACR_NET_EXPORT(jint, LinuxSelector, del0)(JNI_STDARGS, jlong pollset,
+ACR_NET_EXPORT(jint, UnixSelector, del0)(JNI_STDARGS, jlong pollset,
                                           jobject fo, jlong fp)
 {
     int rc = ACR_EOF;
@@ -375,7 +374,7 @@ cleanup:
     return rc;
 }
 
-ACR_NET_EXPORT(int, LinuxSelector, destroy0)(JNI_STDARGS, jlong pollset)
+ACR_NET_EXPORT(int, UnixSelector, destroy0)(JNI_STDARGS, jlong pollset)
 {
     int rc = 0;
     pfd_elem_t *np, *pe = 0;
@@ -428,7 +427,7 @@ ACR_NET_EXPORT(int, LinuxSelector, destr
     return rc;
 }
 
-ACR_NET_EXPORT(jint, LinuxSelector, wait0)(JNI_STDARGS, jlong pollset,
+ACR_NET_EXPORT(jint, UnixSelector, wait0)(JNI_STDARGS, jlong pollset,
                                            jobjectArray rs, jshortArray revents,
                                            jint timeout, jboolean autocancel)
 {
@@ -497,7 +496,8 @@ ACR_NET_EXPORT(jint, LinuxSelector, wait
     }
     if (ns == 0) {
         /* Timeout occured */
-        pevents = JARRAY_CRITICAL(jshort, revents);
+        ps->state = PSS_WAIT;
+        pevents   = JARRAY_CRITICAL(jshort, revents);
         goto cleanup;
     }
     if (ps->state == PSS_WAKEUP) {

Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h?rev=1139267&r1=1139266&r2=1139267&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h Fri Jun 24 12:25:18 2011
@@ -137,9 +137,10 @@ typedef struct stat         struct_stat_
 
 #endif /* F_DUPFD */
 
-#if 1
-# define PS_DEFAULT_TYPE        ACR_PS_TYPE_POLL
+#if defined(LINUX)
+# define PS_DEFAULT_TYPE        ACR_PS_TYPE_UNIX
 #else
+# define PS_DEFAULT_TYPE        ACR_PS_TYPE_POLL
 #endif
 
 /**

Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c?rev=1139267&r1=1139266&r2=1139267&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c Fri Jun 24 12:25:18 2011
@@ -307,7 +307,8 @@ ACR_NET_EXPORT(jint, PollSelector, wait0
         return 0;
     }
     if (ns == 0) {
-        pevents = JARRAY_CRITICAL(jshort, revents);
+        ps->state = PSS_WAIT;
+        pevents   = JARRAY_CRITICAL(jshort, revents);
         for (i = 1; i < ps->used; i++) {
             if (ps->ooset[i].ttl > 0) {
                 if (now == 0)
@@ -323,13 +324,10 @@ ACR_NET_EXPORT(jint, PollSelector, wait0
         goto cleanup;
     }
     if (ps->state == PSS_WAKEUP) {
-        /* Interrupted by wakeup0 */
-        if (ps->fdset[0].revents != 0) {
-            /* Drain the wakeup pipe.
-             * Wakeup pipe is always at index zero.
-             */
-            AcrDrainPipe(ps->wpipe[0]);
-        }
+        /* Drain the wakeup pipe.
+         * Wakeup pipe is always at index zero.
+         */
+        AcrDrainPipe(ps->wpipe[0]);
         ps->state = 0;
         pthread_cond_broadcast(&ps->wakeup);
         pthread_mutex_unlock(&ps->mutex);