You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by mt...@apache.org on 2005/04/19 18:32:11 UTC
cvs commit: jakarta-tomcat-connectors/jni/native/src error.c poll.c
mturk 2005/04/19 09:32:11
Modified: jni/java/org/apache/tomcat/jni Poll.java Status.java
jni/native/src error.c poll.c
Log:
Add maintain for Poll for polling timed out sockets. Remove
thread safety flags, because this is responsibility of Java client.
Revision Changes Path
1.8 +22 -2 jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Poll.java
Index: Poll.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Poll.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- Poll.java 18 Apr 2005 15:24:01 -0000 1.7
+++ Poll.java 19 Apr 2005 16:32:11 -0000 1.8
@@ -104,11 +104,31 @@
* descriptors[n + 2] -> client data
* descriptors[n + 2] -> reserved
* </PRE>
+ * @param remove Remove signaled descriptors from pollset
* @return Number of signalled descriptors (output parameter)
* or negative APR error code.
*/
public static native int poll(long pollset, long timeout,
- long [] descriptors);
+ long [] descriptors, boolean remove);
+
+ /**
+ * Maintain on the descriptor(s) in a pollset
+ * @param pollset The pollset to use
+ * @param descriptors Array of signalled descriptors (output parameter)
+ * The desctiptor array must be four times the size of pollset.
+ * and are populated as follows:
+ * <PRE>
+ * descriptors[n + 0] -> returned events
+ * descriptors[n + 1] -> socket
+ * descriptors[n + 2] -> client data
+ * descriptors[n + 2] -> reserved
+ * </PRE>
+ * @param remove Remove signaled descriptors from pollset
+ * @return Number of signalled descriptors (output parameter)
+ * or negative APR error code.
+ */
+ public static native int maintain(long pollset, long [] descriptors,
+ boolean remove);
/**
* Set the socket time to live.
1.6 +2 -1 jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Status.java
Index: Status.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Status.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- Status.java 15 Apr 2005 17:21:23 -0000 1.5
+++ Status.java 19 Apr 2005 16:32:11 -0000 1.6
@@ -253,5 +253,6 @@
public static final boolean APR_STATUS_IS_EINPROGRESS(int s) { return is(s, 94); }
public static final boolean APR_STATUS_IS_EINTR(int s) { return is(s, 95); }
public static final boolean APR_STATUS_IS_ENOTSOCK(int s) { return is(s, 96); }
+ public static final boolean APR_STATUS_IS_EINVAL(int s) { return is(s, 97); }
}
1.5 +1 -0 jakarta-tomcat-connectors/jni/native/src/error.c
Index: error.c
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/error.c,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- error.c 15 Apr 2005 17:21:23 -0000 1.4
+++ error.c 19 Apr 2005 16:32:11 -0000 1.5
@@ -186,6 +186,7 @@
APR_IS(94, APR_STATUS_IS_EINPROGRESS);
APR_IS(95, APR_STATUS_IS_EINTR);
APR_IS(96, APR_STATUS_IS_ENOTSOCK);
+ APR_IS(97, APR_STATUS_IS_EINVAL);
}
return JNI_FALSE;
}
1.10 +71 -107 jakarta-tomcat-connectors/jni/native/src/poll.c
Index: poll.c
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/poll.c,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- poll.c 18 Apr 2005 15:43:35 -0000 1.9
+++ poll.c 19 Apr 2005 16:32:11 -0000 1.10
@@ -16,7 +16,6 @@
#include "apr.h"
#include "apr_pools.h"
#include "apr_poll.h"
-#include "apr_thread_mutex.h"
#include "tcn.h"
/* Internal poll structure for queryset
@@ -25,13 +24,10 @@
typedef struct tcn_pollset {
apr_pool_t *pool;
apr_int32_t nelts;
- apr_int32_t nadds;
apr_int32_t nalloc;
apr_pollset_t *pollset;
- apr_thread_mutex_t *mutex;
- apr_pollfd_t *query_set;
- apr_pollfd_t *query_add;
- apr_time_t *query_ttl;
+ apr_pollfd_t *socket_set;
+ apr_interval_time_t *socket_ttl;
apr_interval_time_t max_ttl;
} tcn_pollset_t;
@@ -42,14 +38,10 @@
apr_pool_t *p = J2P(pool, apr_pool_t *);
apr_pollset_t *pollset = NULL;
tcn_pollset_t *tps = NULL;
- apr_thread_mutex_t *mutex = NULL;
apr_uint32_t f = (apr_uint32_t)flags;
UNREFERENCED(o);
TCN_ASSERT(pool != 0);
- TCN_THROW_IF_ERR(apr_thread_mutex_create(&mutex,
- APR_THREAD_MUTEX_DEFAULT, p), mutex);
-
if (f & APR_POLLSET_THREADSAFE) {
apr_status_t rv = apr_pollset_create(&pollset, (apr_uint32_t)size, p, f);
if (rv == APR_ENOTIMPL)
@@ -63,23 +55,16 @@
TCN_THROW_IF_ERR(apr_pollset_create(&pollset,
(apr_uint32_t)size, p, f), pollset);
}
-
tps = apr_palloc(p, sizeof(tcn_pollset_t));
tps->pollset = pollset;
- tps->mutex = mutex;
- tps->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
- tps->query_add = apr_palloc(p, size * sizeof(apr_pollfd_t));
- tps->query_ttl = apr_palloc(p, size * sizeof(apr_time_t));
+ tps->socket_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+ tps->socket_ttl = apr_palloc(p, size * sizeof(apr_interval_time_t));
tps->nelts = 0;
- tps->nadds = 0;
tps->nalloc = size;
tps->pool = p;
tps->max_ttl = J2T(ttl);
- return P2J(tps);
cleanup:
- if (mutex)
- apr_thread_mutex_destroy(mutex);
- return 0;
+ return P2J(tps);
}
TCN_IMPLEMENT_CALL(jint, Poll, destroy)(TCN_STDARGS, jlong pollset)
@@ -88,7 +73,6 @@
UNREFERENCED_STDARGS;
TCN_ASSERT(pollset != 0);
- apr_thread_mutex_destroy(p->mutex);
return (jint)apr_pollset_destroy(p->pollset);
}
@@ -98,124 +82,80 @@
{
tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
apr_pollfd_t fd;
- apr_status_t rv;
UNREFERENCED_STDARGS;
TCN_ASSERT(socket != 0);
- if (p->nadds == p->nalloc)
+ if (p->nelts == p->nalloc)
return APR_ENOMEM;
- if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS)
- return rv;
+
memset(&fd, 0, sizeof(apr_pollfd_t));
fd.desc_type = APR_POLL_SOCKET;
fd.reqevents = (apr_int16_t)reqevents;
fd.desc.s = J2P(socket, apr_socket_t *);
fd.client_data = J2P(data, void *);
- p->query_add[p->nadds] = fd;
- p->nadds++;
- apr_thread_mutex_unlock(p->mutex);
- return (jint)rv;
+ p->socket_set[p->nelts++] = fd;
+ return (jint)apr_pollset_add(p->pollset, &fd);
}
-TCN_IMPLEMENT_CALL(jint, Poll, remove)(TCN_STDARGS, jlong pollset,
- jlong socket)
+static apr_status_t do_remove(tcn_pollset_t *p, const apr_pollfd_t *fd)
{
- tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
- apr_pollfd_t fd;
apr_int32_t i;
- apr_status_t rv;
- UNREFERENCED_STDARGS;
- TCN_ASSERT(socket != 0);
-
- memset(&fd, 0, sizeof(apr_pollfd_t));
- fd.desc_type = APR_POLL_SOCKET;
- fd.desc.s = J2P(socket, apr_socket_t *);
-
- if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS)
- return (jint)rv;
for (i = 0; i < p->nelts; i++) {
- if (fd.desc.s == p->query_set[i].desc.s) {
+ if (fd->desc.s == p->socket_set[i].desc.s) {
/* Found an instance of the fd: remove this and any other copies */
apr_int32_t dst = i;
apr_int32_t old_nelts = p->nelts;
p->nelts--;
for (i++; i < old_nelts; i++) {
- if (fd.desc.s == p->query_set[i].desc.s) {
+ if (fd->desc.s == p->socket_set[i].desc.s) {
p->nelts--;
}
else {
- p->query_set[dst] = p->query_set[i];
- dst++;
- }
- }
- break;
- }
- }
- /* Remove from add queue if present
- * This is unlikely to happen, but do it anyway.
- */
- for (i = 0; i < p->nadds; i++) {
- if (fd.desc.s == p->query_add[i].desc.s) {
- /* Found an instance of the fd: remove this and any other copies */
- apr_int32_t dst = i;
- apr_int32_t old_nelts = p->nadds;
- p->nadds--;
- for (i++; i < old_nelts; i++) {
- if (fd.desc.s == p->query_add[i].desc.s) {
- p->nadds--;
- }
- else {
- p->query_add[dst] = p->query_add[i];
+ p->socket_set[dst] = p->socket_set[i];
dst++;
}
}
break;
}
}
+ return apr_pollset_remove(p->pollset, fd);
+}
+
+TCN_IMPLEMENT_CALL(jint, Poll, remove)(TCN_STDARGS, jlong pollset,
+ jlong socket)
+{
+ tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
+ apr_pollfd_t fd;
- rv = apr_pollset_remove(p->pollset, &fd);
- apr_thread_mutex_unlock(p->mutex);
- return (jint)rv;
+ UNREFERENCED_STDARGS;
+ TCN_ASSERT(socket != 0);
+
+ memset(&fd, 0, sizeof(apr_pollfd_t));
+ fd.desc_type = APR_POLL_SOCKET;
+ fd.desc.s = J2P(socket, apr_socket_t *);
+
+ return (jint)do_remove(p, &fd);
}
+
TCN_IMPLEMENT_CALL(jint, Poll, poll)(TCN_STDARGS, jlong pollset,
- jlong timeout, jlongArray set)
+ jlong timeout, jlongArray set,
+ jboolean remove)
{
const apr_pollfd_t *fd = NULL;
tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
jlong *pset = (*e)->GetLongArrayElements(e, set, NULL);
- apr_int32_t n, i = 0, num = 0;
+ apr_int32_t i, num = 0;
apr_status_t rv = APR_SUCCESS;
UNREFERENCED(o);
TCN_ASSERT(pollset != 0);
- if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS)
- return (jint)(-rv);
- /* Add what is present in add queue */
- for (n = 0; n < p->nadds; n++) {
- apr_pollfd_t pf = p->query_add[n];
- if (p->nelts == p->nalloc) {
- /* In case the add queue is too large
- * skip adding to pollset
- */
- break;
- }
- if ((rv = apr_pollset_add(p->pollset, &pf)) != APR_SUCCESS)
- break;
- p->query_ttl[p->nelts] = apr_time_now();
- p->query_set[p->nelts] = pf;
- p->nelts++;
- }
- p->nadds = 0;
- apr_thread_mutex_unlock(p->mutex);
if (rv != APR_SUCCESS)
return (jint)(-rv);
- rv = apr_pollset_poll(p->pollset, J2T(timeout), &num, &fd);
- apr_thread_mutex_lock(p->mutex);
- if (rv != APR_SUCCESS)
+ if (apr_pollset_poll(p->pollset, J2T(timeout), &num, &fd) != APR_SUCCESS)
num = 0;
if (num > 0) {
@@ -223,33 +163,57 @@
pset[i*4+0] = (jlong)(fd->rtnevents);
pset[i*4+1] = P2J(fd->desc.s);
pset[i*4+2] = P2J(fd->client_data);
+ if (remove)
+ do_remove(p, fd);
fd ++;
}
+ (*e)->ReleaseLongArrayElements(e, set, pset, 0);
}
- /* In any case check for timeout sockets */
+ else
+ (*e)->ReleaseLongArrayElements(e, set, pset, JNI_ABORT);
+
+ return (jint)num;
+}
+
+TCN_IMPLEMENT_CALL(jint, Poll, maintain)(TCN_STDARGS, jlong pollset,
+ jlongArray set, jboolean remove)
+{
+ tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
+ jlong *pset = (*e)->GetLongArrayElements(e, set, NULL);
+ apr_int32_t i = 0, num = 0;
+ apr_time_t now = apr_time_now();
+ apr_pollfd_t fd;
+
+ UNREFERENCED(o);
+ TCN_ASSERT(pollset != 0);
+
+ /* Check for timeout sockets */
if (p->max_ttl > 0) {
- apr_time_t now = apr_time_now();
- /* TODO: Add thread mutex protection
- * or make sure the Java part is synchronized.
- */
- for (n = 0; n < p->nelts; n++) {
- if ((now - p->query_ttl[n]) > p->max_ttl) {
- p->query_set[n].rtnevents = APR_POLLHUP | APR_POLLIN;
+ for (i = 0; i < p->nelts; i++) {
+ if ((now - p->socket_ttl[i]) > p->max_ttl) {
+ p->socket_set[i].rtnevents = APR_POLLHUP | APR_POLLIN;
if (num < p->nelts) {
- pset[num*4+0] = (jlong)(p->query_set[n].rtnevents);
- pset[num*4+1] = P2J(p->query_set[n].desc.s);
- pset[num*4+2] = P2J(p->query_set[n].client_data);
+ fd = p->socket_set[i];
+ pset[num*4+0] = (jlong)(fd.rtnevents);
+ pset[num*4+1] = P2J(fd.desc.s);
+ pset[num*4+2] = P2J(fd.client_data);
num++;
}
}
}
+ if (remove && num) {
+ memset(&fd, 0, sizeof(apr_pollfd_t));
+ for (i = 0; i < num; i++) {
+ fd.desc_type = APR_POLL_SOCKET;
+ fd.desc.s = (apr_socket_t *)pset[i*4+1];
+ do_remove(p, &fd);
+ }
+ }
}
- apr_thread_mutex_unlock(p->mutex);
if (num)
(*e)->ReleaseLongArrayElements(e, set, pset, 0);
else
(*e)->ReleaseLongArrayElements(e, set, pset, JNI_ABORT);
-
return (jint)num;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org