You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by mt...@apache.org on 2011/06/27 07:28:50 UTC

svn commit: r1140022 - in /commons/sandbox/runtime/trunk/src/main/native: Makefile.msc.in os/win32/pollset.c os/win32/selectset.c

Author: mturk
Date: Mon Jun 27 05:28:50 2011
New Revision: 1140022

URL: http://svn.apache.org/viewvc?rev=1140022&view=rev
Log:
Add win32 native poll implementation. Usable with Vista+

Added:
    commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c   (with props)
    commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c   (with props)
Modified:
    commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in

Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in?rev=1140022&r1=1140021&r2=1140022&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in (original)
+++ commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in Mon Jun 27 05:28:50 2011
@@ -85,10 +85,12 @@ WIN32_SOURCES=\
 	$(TOPDIR)\os\win32\path.c \
 	$(TOPDIR)\os\win32\platform.c \
 	$(TOPDIR)\os\win32\poll.c \
+	$(TOPDIR)\os\win32\pollset.c \
 	$(TOPDIR)\os\win32\posix.c \
 	$(TOPDIR)\os\win32\procmutex.c \
 	$(TOPDIR)\os\win32\registry.c \
 	$(TOPDIR)\os\win32\security.c \
+	$(TOPDIR)\os\win32\selectset.c \
 	$(TOPDIR)\os\win32\semaphore.c \
 	$(TOPDIR)\os\win32\shmem.c \
 	$(TOPDIR)\os\win32\time.c \

Added: commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c?rev=1140022&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c (added)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c Mon Jun 27 05:28:50 2011
@@ -0,0 +1,512 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "acr/clazz.h"
+#include "acr/memory.h"
+#include "acr/jniapi.h"
+#include "acr/port.h"
+#include "acr/time.h"
+#include "acr/iodefs.h"
+#include "acr/netapi.h"
+#include "arch_opts.h"
+#include "arch_sync.h"
+
+/* pollset operation states */
+#define PSS_DESTROY     1
+#define PSS_POLL        2
+#define PSS_WAIT        3
+#define PSS_WAKEUP      4
+
+#define WAKEUP_IF_POLL()                                \
+    if (AcrAtomic32Equ(&ps->state, PSS_POLL)) {         \
+        char ch = 1;                                    \
+        AcrAtomic32Set(&ps->state, PSS_WAKEUP);         \
+        send(ps->wpipe[1], &ch, 1, 0);                  \
+    } else (void)0
+
+typedef struct acr_pollfd_t {
+    jobject        obj;
+    acr_time_t     ttl;
+    acr_time_t     exp;
+} acr_pollfd_t;
+
+typedef struct acr_pollset_t {
+    WSAPOLLFD                  *fdset;
+    acr_pollfd_t               *ooset;
+    int                         used;
+    int                         size;
+    volatile acr_atomic32_t     state;
+    volatile acr_atomic32_t     waiters;
+    SOCKET                      wpipe[2];
+    CRITICAL_SECTION            mutex;
+    HANDLE                      wakeup;
+} acr_pollset_t;
+
+static short ieventt(int event)
+{
+    short rv = 0;
+
+    if (event & ACR_OP_INP)
+        rv |= POLLIN;
+    if (event & ACR_OP_OUT)
+        rv |= POLLOUT;
+    if (event & ACR_OP_PRI)
+        rv |= POLLPRI;
+    /* POLLERR, POLLHUP, and POLLNVAL aren't valid as requested events
+     */
+    return rv;
+}
+
+static short reventt(short event)
+{
+    short rv = 0;
+
+    if (event & POLLIN)
+        rv |= ACR_OP_INP;
+    if (event & POLLOUT)
+        rv |= ACR_OP_OUT;
+    if (event & POLLPRI)
+        rv |= ACR_OP_PRI;
+    if (event & POLLERR)
+        rv |= ACR_OP_ERROR;
+    if (event & POLLHUP)
+        rv |= ACR_OP_HANGUP;
+#if defined(POLLRDHUP)
+    if (event & POLLRDHUP)
+        rv |= ACR_OP_RDHUP;
+#endif
+    if (event & POLLNVAL)
+        rv |= ACR_OP_NVAL;
+    return rv;
+}
+
+static int wwait(acr_pollset_t *ps)
+{
+    DWORD ws;
+
+    AcrAtomic32Inc(&ps->waiters);
+    LeaveCriticalSection(&ps->mutex);
+    ws = WaitForSingleObject(ps->wakeup, INFINITE);
+    EnterCriticalSection(&ps->mutex);
+    if (AcrAtomic32Dec(&ps->waiters) <= 0) {
+        ResetEvent(ps->wakeup);
+        AcrAtomic32Set(&ps->waiters, 0);
+    }
+    if (ws == WAIT_FAILED)
+        return GetLastError();
+    else
+        return 0;
+}
+
+
+ACR_NET_EXPORT(jlong, PollSelector, create0)(JNI_STDARGS, jint size)
+{
+    int rc;
+    acr_pollset_t *ps;
+
+    if (!ACR_HAVE_LATE_DLL_FUNC(WSAPoll)) {
+        ACR_THROW_NET_ERROR(ACR_ENOTIMPL);
+        return 0;
+    }
+    ps = ACR_TALLOC(acr_pollset_t);
+    if (ps == 0)
+        return 0;
+    ps->wpipe[0] = -1;
+    ps->wpipe[1] = -1;
+    ps->size     = size + 1;
+    ps->used     = 1;
+
+    ps->fdset    = ACR_MALLOC(WSAPOLLFD, ps->size);
+    if (ps->fdset == 0)
+        return 0;
+    ps->ooset    = ACR_MALLOC(acr_pollfd_t,  ps->size);
+    if (ps->fdset == 0) {
+        AcrFree(ps->fdset);
+        return 0;
+    }
+    if ((rc = AcrSocketPair(&ps->wpipe[0], &ps->wpipe[1], 0)) != 0) {
+        ACR_THROW_NET_ERROR(rc);
+        goto cleanup;
+    }
+    /* Add the wakeup pipe to the pset
+     */
+    ps->fdset[0].fd      = ps->wpipe[0];
+    ps->fdset[0].events  = POLLIN;
+    ps->fdset[0].revents = 0;
+    ps->ooset[0].obj     = 0;
+    ps->ooset[0].ttl     = ACR_INFINITE;
+    ps->ooset[0].exp     = ACR_INFINITE;
+
+    if (!InitializeCriticalSectionAndSpinCount(&ps->mutex, 4000)) {
+        ACR_THROW_NET_ERRNO();
+        goto cleanup;
+    }
+    if ((ps->wakeup = CreateEvent(0, FALSE, FALSE, 0)) == 0) {
+        ACR_THROW_NET_ERRNO();
+        DeleteCriticalSection(&ps->mutex);
+        goto cleanup;
+    }
+    return P2J(ps);
+
+cleanup:
+    AcrFree(ps->fdset);
+    AcrFree(ps->ooset);
+    AcrFree(ps);
+    return 0;
+}
+
+ACR_NET_EXPORT(int, PollSelector, destroy0)(JNI_STDARGS, jlong pollset)
+{
+    int i;
+    int rc = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    if (!AcrAtomic32Equ(&ps->state, 0)) {
+        int  state = AcrAtomic32Set(&ps->state, PSS_DESTROY);
+        if (state == PSS_POLL) {
+            char ch   = 1;
+            send(ps->wpipe[1], &ch, 1, 0);
+        }
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if ((rc = wwait(ps)) != 0) {
+            LeaveCriticalSection(&ps->mutex);
+            return rc;
+        }
+    }
+    AcrAtomic32Set(&ps->state, PSS_DESTROY);
+    for (i = 1; i < ps->used; i++) {
+        AcrSelectionKeyReset(env, ps->ooset[i].obj);
+        /* Invalidate the container. */
+        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+    }
+    ps->used = 0;
+    LeaveCriticalSection(&ps->mutex);
+    closesocket(ps->wpipe[0]);
+    closesocket(ps->wpipe[1]);
+    CloseHandle(ps->wakeup);
+    DeleteCriticalSection(&ps->mutex);
+    AcrFree(ps->fdset);
+    AcrFree(ps->ooset);
+    AcrFree(ps);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, PollSelector, clr0)(JNI_STDARGS, jlong pollset,
+                                         jobjectArray rs)
+{
+    int i;
+    int cnt = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    while (!AcrAtomic32Equ(&ps->state, 0)) {
+        if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+            /* Interrupted by destroy0 */
+            LeaveCriticalSection(&ps->mutex);
+            return 0;
+        }
+        WAKEUP_IF_POLL();
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if (wwait(ps) != 0) {
+            LeaveCriticalSection(&ps->mutex);
+            ACR_THROW(ACR_EX_EILLEGAL, 0);
+            return 0;
+        }
+    }
+    for (i = 1; i < ps->used; i++) {
+        (*env)->SetObjectArrayElement(env, rs, cnt++, ps->ooset[i].obj);
+        /* Unref the container. */
+        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+    }
+    ps->used = 1;
+    LeaveCriticalSection(&ps->mutex);
+    return cnt;
+}
+
+ACR_NET_EXPORT(void, PollSelector, wakeup0)(JNI_STDARGS, jlong pollset)
+{
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    WAKEUP_IF_POLL();
+    LeaveCriticalSection(&ps->mutex);
+}
+
+ACR_NET_EXPORT(jint, PollSelector, size0)(JNI_STDARGS, jlong pollset)
+{
+    int rv;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    rv = ps->used - 1;
+    LeaveCriticalSection(&ps->mutex);
+    return rv;
+}
+
+ACR_NET_EXPORT(jint, PollSelector, wait0)(JNI_STDARGS, jlong pollset,
+                                          jobjectArray rs, jshortArray revents,
+                                          jint timeout, jboolean autocancel)
+{
+    int i, ns, rc = 0;
+    int rv = 0;
+    acr_time_t now = 0;
+    jshort *pevents;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    if (!AcrAtomic32Equ(&ps->state, 0)) {
+        /* Note that this should never happen if api is correctly used.
+         * wait cannot be run from multiple threads and cannot be run
+         * after destroy.
+         */
+        LeaveCriticalSection(&ps->mutex);
+        ACR_THROW(ACR_EX_EILLEGAL, 0);
+        return 0;
+    }
+    if (ps->used == 1) {
+        /* We only have the wakeup pipe in the pollset
+         * so there is no point to wait.
+         */
+        LeaveCriticalSection(&ps->mutex);
+        return 0;
+    }
+
+    AcrAtomic32Set(&ps->state, PSS_POLL);
+    LeaveCriticalSection(&ps->mutex);
+    ns = WSAPoll(ps->fdset, ps->used, timeout);
+    if (ns == SOCKET_ERROR)
+        rc = ACR_GET_OS_ERROR();
+    EnterCriticalSection(&ps->mutex);
+    if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+        /* Interrupted by destroy0 */
+        if (!AcrAtomic32Equ(&ps->waiters, 0))
+            SetEvent(ps->wakeup);
+        LeaveCriticalSection(&ps->mutex);
+        return 0;
+    }
+    if (rc != 0) {
+        /* Error during poll */
+        AcrAtomic32Set(&ps->state, 0);
+        if (!AcrAtomic32Equ(&ps->waiters, 0))
+            SetEvent(ps->wakeup);
+        LeaveCriticalSection(&ps->mutex);
+        ACR_THROW_NET_ERROR(rc);
+        return 0;
+    }
+    if (ns == 0) {
+        ps->state = PSS_WAIT;
+        pevents   = JARRAY_CRITICAL(jshort, revents);
+        for (i = 1; i < ps->used; i++) {
+            if (ps->ooset[i].ttl > 0) {
+                if (now == 0)
+                    now = AcrTimeNow();
+                if (now > ps->ooset[i].exp) {
+                    /* Expired descriptor */
+                    ps->fdset[i].revents = POLLHUP;
+                    pevents[rv] = ACR_OP_TIMEOUT;
+                    (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+                }
+            }
+        }
+        goto cleanup;
+    }
+    if (ps->state == PSS_WAKEUP) {
+        /* Drain the wakeup pipe.
+         * Wakeup pipe is always at index zero.
+         */
+        AcrDrainSocket(ps->wpipe[0]);
+        AcrAtomic32Set(&ps->state, 0);
+        if (!AcrAtomic32Equ(&ps->waiters, 0))
+            SetEvent(ps->wakeup);
+        LeaveCriticalSection(&ps->mutex);
+        return 0;
+    }
+    ps->state = PSS_WAIT;
+    pevents   = JARRAY_CRITICAL(jshort, revents);
+    /* Cycle trough the descriptors */
+    for (i = 0; i < ps->used; i++) {
+        if (ps->fdset[i].revents != 0) {
+            if (i == 0) {
+                /* Drain the wakeup pipe.
+                 * Wakeup pipe is always at index zero.
+                 */
+                AcrDrainSocket(ps->wpipe[0]);
+                continue;
+            }
+            else {
+                pevents[rv] = reventt(ps->fdset[i].revents);
+                (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+                if (ps->ooset[i].ttl > 0) {
+                    /* Reset TTL
+                     */
+                    if (now == 0)
+                        now = AcrTimeNow();
+                    ps->ooset[i].exp = now + ps->ooset[i].ttl;
+                }
+            }
+        }
+        else {
+            /* Check for the expired descriptors.
+             */
+            if (ps->ooset[i].ttl > 0) {
+                if (now == 0)
+                    now = AcrTimeNow();
+                if (now > ps->ooset[i].exp) {
+                    /* Expired descriptor */
+                    ps->fdset[i].revents = POLLHUP;
+                    pevents[rv] = ACR_OP_TIMEOUT;
+                    (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+                }
+            }
+        }
+    }
+cleanup:
+    RELEASE_CRITICAL(revents, pevents);
+    if (autocancel == JNI_TRUE && rv > 0) {
+        /* Remove all descriptors with revents set except
+         * the wakeup pipe at index zero.
+         */
+        for (i = 1; i < ps->used; i++) {
+            if (ps->fdset[i].revents != 0) {
+                int dest = i;
+                int used = ps->used;
+                ps->used--;
+                /* Unref descriptor */
+                (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+                for (++i; i < used; i++) {
+                    if (ps->fdset[i].revents != 0) {
+                        /* Skip signaled descriptor */
+                        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+                        ps->used--;
+                    }
+                    else {
+                        ps->fdset[dest] = ps->fdset[i];
+                        ps->ooset[dest] = ps->ooset[i];
+                        dest++;
+                    }
+                }
+            }
+        }
+    }
+    AcrAtomic32Set(&ps->state, 0);
+    if (!AcrAtomic32Equ(&ps->waiters, 0))
+        SetEvent(ps->wakeup);
+    LeaveCriticalSection(&ps->mutex);
+    return rv;
+}
+
+ACR_NET_EXPORT(jint, PollSelector, add0)(JNI_STDARGS, jlong pollset, jobject fo,
+                                         jlong fp, jint events, jint ttlms)
+{
+    int i, rc = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+    acr_fd_t *fd      = J2P(fp, acr_fd_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    while (!AcrAtomic32Equ(&ps->state, 0)) {
+        if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+            rc = 0;
+            goto cleanup;
+        }
+        WAKEUP_IF_POLL();
+        if ((rc = wwait(ps)) != 0)
+            goto cleanup;
+    }
+    if (ps->used == ps->size) {
+        /* Overflow
+         */
+        rc = ACR_EOVERFLOW;
+        goto cleanup;
+    }
+    for (i = 1; i < ps->used; i++) {
+        if (ps->fdset[i].fd == fd->u.s) {
+            /* Duplicate descriptor
+             */
+            rc = ACR_EALREADY;
+            goto cleanup;
+        }
+    }
+    ps->fdset[ps->used].fd      = fd->u.s;
+    ps->fdset[ps->used].events  = ieventt(events);
+    ps->fdset[ps->used].revents = 0;
+    ps->ooset[ps->used].obj     = (*env)->NewGlobalRef(env, fo);
+    if (ps->ooset[ps->used].obj == 0) {
+        /* In case the NewGlobalRef fails,
+         * OutOfMemoryError should be thrown already by the JVM.
+         */
+        rc = ACR_ENOMEM;
+        goto cleanup;
+    }
+    if (ttlms > 0) {
+        ps->ooset[ps->used].ttl = AcrTimeFromMsec(ttlms);
+        ps->ooset[ps->used].exp = AcrTimeNow() + ps->ooset[ps->used].ttl;
+    }
+    else {
+        ps->ooset[ps->used].ttl = ACR_INFINITE;
+        ps->ooset[ps->used].exp = ACR_INFINITE;
+    }
+    ps->used++;
+cleanup:
+    LeaveCriticalSection(&ps->mutex);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, PollSelector, del0)(JNI_STDARGS, jlong pollset,
+                                          jobject fo, jlong fp)
+{
+    int i, rc = ACR_EOF;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    if (ps->used < 2)
+        goto cleanup;
+    while (!AcrAtomic32Equ(&ps->state, 0)) {
+        if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+            rc = 0;
+            goto cleanup;
+        }
+        WAKEUP_IF_POLL();
+        if ((rc = wwait(ps)) != 0)
+            goto cleanup;
+    }
+
+    for (i = 1; i < ps->used; i++) {
+        if ((*env)->IsSameObject(env, ps->ooset[i].obj, fo) == JNI_TRUE) {
+            int dest = i;
+            int used = ps->used;
+            ps->used--;
+            /* Unref descriptor */
+            (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+            for (++i; i < used; i++) {
+                ps->fdset[dest] = ps->fdset[i];
+                ps->ooset[dest] = ps->ooset[i];
+                dest++;
+            }
+            rc = 0;
+        }
+    }
+
+cleanup:
+    LeaveCriticalSection(&ps->mutex);
+    return rc;
+}

Propchange: commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c
------------------------------------------------------------------------------
    svn:eol-style = native

Added: commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c?rev=1140022&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c (added)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c Mon Jun 27 05:28:50 2011
@@ -0,0 +1,50 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "acr/clazz.h"
+#include "acr/memory.h"
+#include "acr/jniapi.h"
+#include "acr/port.h"
+#include "acr/time.h"
+#include "acr/iodefs.h"
+#include "acr/netapi.h"
+#include "arch_opts.h"
+
+ACR_NET_EXPORT(jint, SocketSelectorFactory, type0)(JNI_STDARGS)
+{
+    if (ACR_HAVE_LATE_DLL_FUNC(WSAPoll))
+        return ACR_PS_TYPE_POLL;
+    else
+        return ACR_PS_TYPE_SELECT;
+}
+
+ACR_NET_EXPORT(jint, LocalSelectorFactory, type0)(JNI_STDARGS)
+{
+    return ACR_PS_TYPE_WINPIPE;
+}
+
+ACR_NET_EXPORT(jint, SocketSelectorFactory, size0)(JNI_STDARGS)
+{
+    if (ACR_HAVE_LATE_DLL_FUNC(WSAPoll))
+        return 65535;
+    else
+        return FD_SETSIZE - 1;
+}
+
+ACR_NET_EXPORT(jint, LocalSelectorFactory, size0)(JNI_STDARGS)
+{
+    return MAXIMUM_WAIT_OBJECTS - 1;
+}

Propchange: commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c
------------------------------------------------------------------------------
    svn:eol-style = native