You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by mt...@apache.org on 2011/06/27 07:28:50 UTC
svn commit: r1140022 - in /commons/sandbox/runtime/trunk/src/main/native:
Makefile.msc.in os/win32/pollset.c os/win32/selectset.c
Author: mturk
Date: Mon Jun 27 05:28:50 2011
New Revision: 1140022
URL: http://svn.apache.org/viewvc?rev=1140022&view=rev
Log:
Add win32 native poll implementation. Usable with Vista+
Added:
commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c (with props)
commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c (with props)
Modified:
commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in
Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in?rev=1140022&r1=1140021&r2=1140022&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in (original)
+++ commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in Mon Jun 27 05:28:50 2011
@@ -85,10 +85,12 @@ WIN32_SOURCES=\
$(TOPDIR)\os\win32\path.c \
$(TOPDIR)\os\win32\platform.c \
$(TOPDIR)\os\win32\poll.c \
+ $(TOPDIR)\os\win32\pollset.c \
$(TOPDIR)\os\win32\posix.c \
$(TOPDIR)\os\win32\procmutex.c \
$(TOPDIR)\os\win32\registry.c \
$(TOPDIR)\os\win32\security.c \
+ $(TOPDIR)\os\win32\selectset.c \
$(TOPDIR)\os\win32\semaphore.c \
$(TOPDIR)\os\win32\shmem.c \
$(TOPDIR)\os\win32\time.c \
Added: commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c?rev=1140022&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c (added)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c Mon Jun 27 05:28:50 2011
@@ -0,0 +1,512 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "acr/clazz.h"
+#include "acr/memory.h"
+#include "acr/jniapi.h"
+#include "acr/port.h"
+#include "acr/time.h"
+#include "acr/iodefs.h"
+#include "acr/netapi.h"
+#include "arch_opts.h"
+#include "arch_sync.h"
+
+/* pollset operation states */
+#define PSS_DESTROY 1
+#define PSS_POLL 2
+#define PSS_WAIT 3
+#define PSS_WAKEUP 4
+
+#define WAKEUP_IF_POLL() \
+ if (AcrAtomic32Equ(&ps->state, PSS_POLL)) { \
+ char ch = 1; \
+ AcrAtomic32Set(&ps->state, PSS_WAKEUP); \
+ send(ps->wpipe[1], &ch, 1, 0); \
+ } else (void)0
+
+typedef struct acr_pollfd_t {
+ jobject obj;
+ acr_time_t ttl;
+ acr_time_t exp;
+} acr_pollfd_t;
+
+typedef struct acr_pollset_t {
+ WSAPOLLFD *fdset;
+ acr_pollfd_t *ooset;
+ int used;
+ int size;
+ volatile acr_atomic32_t state;
+ volatile acr_atomic32_t waiters;
+ SOCKET wpipe[2];
+ CRITICAL_SECTION mutex;
+ HANDLE wakeup;
+} acr_pollset_t;
+
+static short ieventt(int event)
+{
+ short rv = 0;
+
+ if (event & ACR_OP_INP)
+ rv |= POLLIN;
+ if (event & ACR_OP_OUT)
+ rv |= POLLOUT;
+ if (event & ACR_OP_PRI)
+ rv |= POLLPRI;
+ /* POLLERR, POLLHUP, and POLLNVAL aren't valid as requested events
+ */
+ return rv;
+}
+
+static short reventt(short event)
+{
+ short rv = 0;
+
+ if (event & POLLIN)
+ rv |= ACR_OP_INP;
+ if (event & POLLOUT)
+ rv |= ACR_OP_OUT;
+ if (event & POLLPRI)
+ rv |= ACR_OP_PRI;
+ if (event & POLLERR)
+ rv |= ACR_OP_ERROR;
+ if (event & POLLHUP)
+ rv |= ACR_OP_HANGUP;
+#if defined(POLLRDHUP)
+ if (event & POLLRDHUP)
+ rv |= ACR_OP_RDHUP;
+#endif
+ if (event & POLLNVAL)
+ rv |= ACR_OP_NVAL;
+ return rv;
+}
+
+static int wwait(acr_pollset_t *ps)
+{
+ DWORD ws;
+
+ AcrAtomic32Inc(&ps->waiters);
+ LeaveCriticalSection(&ps->mutex);
+ ws = WaitForSingleObject(ps->wakeup, INFINITE);
+ EnterCriticalSection(&ps->mutex);
+ if (AcrAtomic32Dec(&ps->waiters) <= 0) {
+ ResetEvent(ps->wakeup);
+ AcrAtomic32Set(&ps->waiters, 0);
+ }
+ if (ws == WAIT_FAILED)
+ return GetLastError();
+ else
+ return 0;
+}
+
+
+ACR_NET_EXPORT(jlong, PollSelector, create0)(JNI_STDARGS, jint size)
+{
+ int rc;
+ acr_pollset_t *ps;
+
+ if (!ACR_HAVE_LATE_DLL_FUNC(WSAPoll)) {
+ ACR_THROW_NET_ERROR(ACR_ENOTIMPL);
+ return 0;
+ }
+ ps = ACR_TALLOC(acr_pollset_t);
+ if (ps == 0)
+ return 0;
+ ps->wpipe[0] = -1;
+ ps->wpipe[1] = -1;
+ ps->size = size + 1;
+ ps->used = 1;
+
+ ps->fdset = ACR_MALLOC(WSAPOLLFD, ps->size);
+ if (ps->fdset == 0)
+ return 0;
+ ps->ooset = ACR_MALLOC(acr_pollfd_t, ps->size);
+ if (ps->fdset == 0) {
+ AcrFree(ps->fdset);
+ return 0;
+ }
+ if ((rc = AcrSocketPair(&ps->wpipe[0], &ps->wpipe[1], 0)) != 0) {
+ ACR_THROW_NET_ERROR(rc);
+ goto cleanup;
+ }
+ /* Add the wakeup pipe to the pset
+ */
+ ps->fdset[0].fd = ps->wpipe[0];
+ ps->fdset[0].events = POLLIN;
+ ps->fdset[0].revents = 0;
+ ps->ooset[0].obj = 0;
+ ps->ooset[0].ttl = ACR_INFINITE;
+ ps->ooset[0].exp = ACR_INFINITE;
+
+ if (!InitializeCriticalSectionAndSpinCount(&ps->mutex, 4000)) {
+ ACR_THROW_NET_ERRNO();
+ goto cleanup;
+ }
+ if ((ps->wakeup = CreateEvent(0, FALSE, FALSE, 0)) == 0) {
+ ACR_THROW_NET_ERRNO();
+ DeleteCriticalSection(&ps->mutex);
+ goto cleanup;
+ }
+ return P2J(ps);
+
+cleanup:
+ AcrFree(ps->fdset);
+ AcrFree(ps->ooset);
+ AcrFree(ps);
+ return 0;
+}
+
+ACR_NET_EXPORT(int, PollSelector, destroy0)(JNI_STDARGS, jlong pollset)
+{
+ int i;
+ int rc = 0;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ EnterCriticalSection(&ps->mutex);
+ if (!AcrAtomic32Equ(&ps->state, 0)) {
+ int state = AcrAtomic32Set(&ps->state, PSS_DESTROY);
+ if (state == PSS_POLL) {
+ char ch = 1;
+ send(ps->wpipe[1], &ch, 1, 0);
+ }
+ /* Wait until the wait0 call breaks.
+ * Since we set the state to DESTROY
+ * wait0 will return 0.
+ */
+ if ((rc = wwait(ps)) != 0) {
+ LeaveCriticalSection(&ps->mutex);
+ return rc;
+ }
+ }
+ AcrAtomic32Set(&ps->state, PSS_DESTROY);
+ for (i = 1; i < ps->used; i++) {
+ AcrSelectionKeyReset(env, ps->ooset[i].obj);
+ /* Invalidate the container. */
+ (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+ }
+ ps->used = 0;
+ LeaveCriticalSection(&ps->mutex);
+ closesocket(ps->wpipe[0]);
+ closesocket(ps->wpipe[1]);
+ CloseHandle(ps->wakeup);
+ DeleteCriticalSection(&ps->mutex);
+ AcrFree(ps->fdset);
+ AcrFree(ps->ooset);
+ AcrFree(ps);
+ return rc;
+}
+
+ACR_NET_EXPORT(jint, PollSelector, clr0)(JNI_STDARGS, jlong pollset,
+ jobjectArray rs)
+{
+ int i;
+ int cnt = 0;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ EnterCriticalSection(&ps->mutex);
+ while (!AcrAtomic32Equ(&ps->state, 0)) {
+ if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+ /* Interrupted by destroy0 */
+ LeaveCriticalSection(&ps->mutex);
+ return 0;
+ }
+ WAKEUP_IF_POLL();
+ /* Wait until the wait0 call breaks.
+ * Since we set the state to DESTROY
+ * wait0 will return 0.
+ */
+ if (wwait(ps) != 0) {
+ LeaveCriticalSection(&ps->mutex);
+ ACR_THROW(ACR_EX_EILLEGAL, 0);
+ return 0;
+ }
+ }
+ for (i = 1; i < ps->used; i++) {
+ (*env)->SetObjectArrayElement(env, rs, cnt++, ps->ooset[i].obj);
+ /* Unref the container. */
+ (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+ }
+ ps->used = 1;
+ LeaveCriticalSection(&ps->mutex);
+ return cnt;
+}
+
+ACR_NET_EXPORT(void, PollSelector, wakeup0)(JNI_STDARGS, jlong pollset)
+{
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ EnterCriticalSection(&ps->mutex);
+ WAKEUP_IF_POLL();
+ LeaveCriticalSection(&ps->mutex);
+}
+
+ACR_NET_EXPORT(jint, PollSelector, size0)(JNI_STDARGS, jlong pollset)
+{
+ int rv;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ EnterCriticalSection(&ps->mutex);
+ rv = ps->used - 1;
+ LeaveCriticalSection(&ps->mutex);
+ return rv;
+}
+
+ACR_NET_EXPORT(jint, PollSelector, wait0)(JNI_STDARGS, jlong pollset,
+ jobjectArray rs, jshortArray revents,
+ jint timeout, jboolean autocancel)
+{
+ int i, ns, rc = 0;
+ int rv = 0;
+ acr_time_t now = 0;
+ jshort *pevents;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ EnterCriticalSection(&ps->mutex);
+ if (!AcrAtomic32Equ(&ps->state, 0)) {
+ /* Note that this should never happen if api is correctly used.
+ * wait cannot be run from multiple threads and cannot be run
+ * after destroy.
+ */
+ LeaveCriticalSection(&ps->mutex);
+ ACR_THROW(ACR_EX_EILLEGAL, 0);
+ return 0;
+ }
+ if (ps->used == 1) {
+ /* We only have the wakeup pipe in the pollset
+ * so there is no point to wait.
+ */
+ LeaveCriticalSection(&ps->mutex);
+ return 0;
+ }
+
+ AcrAtomic32Set(&ps->state, PSS_POLL);
+ LeaveCriticalSection(&ps->mutex);
+ ns = WSAPoll(ps->fdset, ps->used, timeout);
+ if (ns == SOCKET_ERROR)
+ rc = ACR_GET_OS_ERROR();
+ EnterCriticalSection(&ps->mutex);
+ if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+ /* Interrupted by destroy0 */
+ if (!AcrAtomic32Equ(&ps->waiters, 0))
+ SetEvent(ps->wakeup);
+ LeaveCriticalSection(&ps->mutex);
+ return 0;
+ }
+ if (rc != 0) {
+ /* Error during poll */
+ AcrAtomic32Set(&ps->state, 0);
+ if (!AcrAtomic32Equ(&ps->waiters, 0))
+ SetEvent(ps->wakeup);
+ LeaveCriticalSection(&ps->mutex);
+ ACR_THROW_NET_ERROR(rc);
+ return 0;
+ }
+ if (ns == 0) {
+ ps->state = PSS_WAIT;
+ pevents = JARRAY_CRITICAL(jshort, revents);
+ for (i = 1; i < ps->used; i++) {
+ if (ps->ooset[i].ttl > 0) {
+ if (now == 0)
+ now = AcrTimeNow();
+ if (now > ps->ooset[i].exp) {
+ /* Expired descriptor */
+ ps->fdset[i].revents = POLLHUP;
+ pevents[rv] = ACR_OP_TIMEOUT;
+ (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+ }
+ }
+ }
+ goto cleanup;
+ }
+ if (ps->state == PSS_WAKEUP) {
+ /* Drain the wakeup pipe.
+ * Wakeup pipe is always at index zero.
+ */
+ AcrDrainSocket(ps->wpipe[0]);
+ AcrAtomic32Set(&ps->state, 0);
+ if (!AcrAtomic32Equ(&ps->waiters, 0))
+ SetEvent(ps->wakeup);
+ LeaveCriticalSection(&ps->mutex);
+ return 0;
+ }
+ ps->state = PSS_WAIT;
+ pevents = JARRAY_CRITICAL(jshort, revents);
+ /* Cycle trough the descriptors */
+ for (i = 0; i < ps->used; i++) {
+ if (ps->fdset[i].revents != 0) {
+ if (i == 0) {
+ /* Drain the wakeup pipe.
+ * Wakeup pipe is always at index zero.
+ */
+ AcrDrainSocket(ps->wpipe[0]);
+ continue;
+ }
+ else {
+ pevents[rv] = reventt(ps->fdset[i].revents);
+ (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+ if (ps->ooset[i].ttl > 0) {
+ /* Reset TTL
+ */
+ if (now == 0)
+ now = AcrTimeNow();
+ ps->ooset[i].exp = now + ps->ooset[i].ttl;
+ }
+ }
+ }
+ else {
+ /* Check for the expired descriptors.
+ */
+ if (ps->ooset[i].ttl > 0) {
+ if (now == 0)
+ now = AcrTimeNow();
+ if (now > ps->ooset[i].exp) {
+ /* Expired descriptor */
+ ps->fdset[i].revents = POLLHUP;
+ pevents[rv] = ACR_OP_TIMEOUT;
+ (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+ }
+ }
+ }
+ }
+cleanup:
+ RELEASE_CRITICAL(revents, pevents);
+ if (autocancel == JNI_TRUE && rv > 0) {
+ /* Remove all descriptors with revents set except
+ * the wakeup pipe at index zero.
+ */
+ for (i = 1; i < ps->used; i++) {
+ if (ps->fdset[i].revents != 0) {
+ int dest = i;
+ int used = ps->used;
+ ps->used--;
+ /* Unref descriptor */
+ (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+ for (++i; i < used; i++) {
+ if (ps->fdset[i].revents != 0) {
+ /* Skip signaled descriptor */
+ (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+ ps->used--;
+ }
+ else {
+ ps->fdset[dest] = ps->fdset[i];
+ ps->ooset[dest] = ps->ooset[i];
+ dest++;
+ }
+ }
+ }
+ }
+ }
+ AcrAtomic32Set(&ps->state, 0);
+ if (!AcrAtomic32Equ(&ps->waiters, 0))
+ SetEvent(ps->wakeup);
+ LeaveCriticalSection(&ps->mutex);
+ return rv;
+}
+
+ACR_NET_EXPORT(jint, PollSelector, add0)(JNI_STDARGS, jlong pollset, jobject fo,
+ jlong fp, jint events, jint ttlms)
+{
+ int i, rc = 0;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+ acr_fd_t *fd = J2P(fp, acr_fd_t *);
+
+ EnterCriticalSection(&ps->mutex);
+ while (!AcrAtomic32Equ(&ps->state, 0)) {
+ if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+ rc = 0;
+ goto cleanup;
+ }
+ WAKEUP_IF_POLL();
+ if ((rc = wwait(ps)) != 0)
+ goto cleanup;
+ }
+ if (ps->used == ps->size) {
+ /* Overflow
+ */
+ rc = ACR_EOVERFLOW;
+ goto cleanup;
+ }
+ for (i = 1; i < ps->used; i++) {
+ if (ps->fdset[i].fd == fd->u.s) {
+ /* Duplicate descriptor
+ */
+ rc = ACR_EALREADY;
+ goto cleanup;
+ }
+ }
+ ps->fdset[ps->used].fd = fd->u.s;
+ ps->fdset[ps->used].events = ieventt(events);
+ ps->fdset[ps->used].revents = 0;
+ ps->ooset[ps->used].obj = (*env)->NewGlobalRef(env, fo);
+ if (ps->ooset[ps->used].obj == 0) {
+ /* In case the NewGlobalRef fails,
+ * OutOfMemoryError should be thrown already by the JVM.
+ */
+ rc = ACR_ENOMEM;
+ goto cleanup;
+ }
+ if (ttlms > 0) {
+ ps->ooset[ps->used].ttl = AcrTimeFromMsec(ttlms);
+ ps->ooset[ps->used].exp = AcrTimeNow() + ps->ooset[ps->used].ttl;
+ }
+ else {
+ ps->ooset[ps->used].ttl = ACR_INFINITE;
+ ps->ooset[ps->used].exp = ACR_INFINITE;
+ }
+ ps->used++;
+cleanup:
+ LeaveCriticalSection(&ps->mutex);
+ return rc;
+}
+
+ACR_NET_EXPORT(jint, PollSelector, del0)(JNI_STDARGS, jlong pollset,
+ jobject fo, jlong fp)
+{
+ int i, rc = ACR_EOF;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ EnterCriticalSection(&ps->mutex);
+ if (ps->used < 2)
+ goto cleanup;
+ while (!AcrAtomic32Equ(&ps->state, 0)) {
+ if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+ rc = 0;
+ goto cleanup;
+ }
+ WAKEUP_IF_POLL();
+ if ((rc = wwait(ps)) != 0)
+ goto cleanup;
+ }
+
+ for (i = 1; i < ps->used; i++) {
+ if ((*env)->IsSameObject(env, ps->ooset[i].obj, fo) == JNI_TRUE) {
+ int dest = i;
+ int used = ps->used;
+ ps->used--;
+ /* Unref descriptor */
+ (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+ for (++i; i < used; i++) {
+ ps->fdset[dest] = ps->fdset[i];
+ ps->ooset[dest] = ps->ooset[i];
+ dest++;
+ }
+ rc = 0;
+ }
+ }
+
+cleanup:
+ LeaveCriticalSection(&ps->mutex);
+ return rc;
+}
Propchange: commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c
------------------------------------------------------------------------------
svn:eol-style = native
Added: commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c?rev=1140022&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c (added)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c Mon Jun 27 05:28:50 2011
@@ -0,0 +1,50 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "acr/clazz.h"
+#include "acr/memory.h"
+#include "acr/jniapi.h"
+#include "acr/port.h"
+#include "acr/time.h"
+#include "acr/iodefs.h"
+#include "acr/netapi.h"
+#include "arch_opts.h"
+
+ACR_NET_EXPORT(jint, SocketSelectorFactory, type0)(JNI_STDARGS)
+{
+ if (ACR_HAVE_LATE_DLL_FUNC(WSAPoll))
+ return ACR_PS_TYPE_POLL;
+ else
+ return ACR_PS_TYPE_SELECT;
+}
+
+ACR_NET_EXPORT(jint, LocalSelectorFactory, type0)(JNI_STDARGS)
+{
+ return ACR_PS_TYPE_WINPIPE;
+}
+
+ACR_NET_EXPORT(jint, SocketSelectorFactory, size0)(JNI_STDARGS)
+{
+ if (ACR_HAVE_LATE_DLL_FUNC(WSAPoll))
+ return 65535;
+ else
+ return FD_SETSIZE - 1;
+}
+
+ACR_NET_EXPORT(jint, LocalSelectorFactory, size0)(JNI_STDARGS)
+{
+ return MAXIMUM_WAIT_OBJECTS - 1;
+}
Propchange: commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c
------------------------------------------------------------------------------
svn:eol-style = native