You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by pq...@apache.org on 2008/12/01 03:55:15 UTC

svn commit: r721952 - in /httpd/httpd/trunk: ./ modules/ modules/cluster/

Author: pquerna
Date: Sun Nov 30 18:55:14 2008
New Revision: 721952

URL: http://svn.apache.org/viewvc?rev=721952&view=rev
Log:
Add two new modules, originally written at Joost, to handle load balancing across
multiple apache servers within the same datacenter.

mod_heartbeat generates multicast status messages with the current number of 
clients connected, but the formated can easily be extended to include other things.

mod_heartmonitor collects these messages into a static file, which then can be 
used for other modules to make load balancing decisions on.

Added:
    httpd/httpd/trunk/modules/cluster/   (with props)
    httpd/httpd/trunk/modules/cluster/Makefile.in   (with props)
    httpd/httpd/trunk/modules/cluster/README.heartbeat
    httpd/httpd/trunk/modules/cluster/README.heartmonitor
    httpd/httpd/trunk/modules/cluster/config.m4
    httpd/httpd/trunk/modules/cluster/mod_heartbeat.c   (with props)
    httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c   (with props)
Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/modules/README

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=721952&r1=721951&r2=721952&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Sun Nov 30 18:55:14 2008
@@ -2,6 +2,12 @@
 Changes with Apache 2.3.0
 [ When backported to 2.2.x, remove entry from this file ]
 
+  *) mod_heartmonitor: New module to collect heartbeats, and write out a file
+     so that other modules can load balance traffic as needed. [Paul Querna]
+
+  *) mod_heartbeat: New module to genarate multicast heartbeats to konw if a 
+     server is online. [Paul Querna]
+
   *) core: Error responses set by filters were being coerced into 500 errors,
      sometimes appended to the original error response. Log entry of:
      'Handler for (null) returned invalid result code -3' 

Modified: httpd/httpd/trunk/modules/README
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/README?rev=721952&r1=721951&r2=721952&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/README (original)
+++ httpd/httpd/trunk/modules/README Sun Nov 30 18:55:14 2008
@@ -13,6 +13,9 @@
 database/
   The apache DBD framework manages connections to SQL backends efficiently.
 
+cluster/
+  Modules for working with multiple servers.
+
 dav/
   This directory houses modules that implement WebDAV functionality.
 

Propchange: httpd/httpd/trunk/modules/cluster/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sun Nov 30 18:55:14 2008
@@ -0,0 +1,5 @@
+.libs
+.deps
+*.slo
+Makefile
+modules.mk

Added: httpd/httpd/trunk/modules/cluster/Makefile.in
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/Makefile.in?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/Makefile.in (added)
+++ httpd/httpd/trunk/modules/cluster/Makefile.in Sun Nov 30 18:55:14 2008
@@ -0,0 +1,3 @@
+# a modules Makefile has no explicit targets -- they will be defined by
+# whatever modules are enabled. just grab special.mk to deal with this.
+include $(top_srcdir)/build/special.mk

Propchange: httpd/httpd/trunk/modules/cluster/Makefile.in
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpd/httpd/trunk/modules/cluster/Makefile.in
------------------------------------------------------------------------------
    svn:keywords = Date Revision Author HeadURL Id

Added: httpd/httpd/trunk/modules/cluster/README.heartbeat
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/README.heartbeat?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/README.heartbeat (added)
+++ httpd/httpd/trunk/modules/cluster/README.heartbeat Sun Nov 30 18:55:14 2008
@@ -0,0 +1,33 @@
+mod_heartbeat
+
+Broadcasts the current Apache Connection status over multicast.
+
+Example Configuration:
+  HeartbeatAddress 239.0.0.1:27999
+
+Dependencies:
+  mod_status must be either a static module, or if a dynamic module, it must be 
+  loaded before mod_heartbeat.
+
+
+Consuming:
+  Every 1 second, this module generates a single multicast UDP packet,
+  containing the number of busy and idle workers.
+  
+  The packet is a simple ASCII format, similiar to GET query parameters in UDP.
+  
+  An Example packet:
+    v=1&ready=75&busy=0
+
+  Consumers should handle new variables besides busy and ready, separated by '&'
+  being added in the future.
+  
+Misc:
+  The interval of 1 seconds is controlled by the HEARTBEAT_INTERVAL
+  compile time define.  This is not currently tunable at run time. To make this
+  module send the status packet more often, you must add to the CFLAGS used to
+  compile the module to include:
+    -DHEARTBEAT_INTERVAL=3
+  Would cause the broadcasts to be sent every 3 seconds.
+
+

Added: httpd/httpd/trunk/modules/cluster/README.heartmonitor
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/README.heartmonitor?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/README.heartmonitor (added)
+++ httpd/httpd/trunk/modules/cluster/README.heartmonitor Sun Nov 30 18:55:14 2008
@@ -0,0 +1,30 @@
+mod_heartmonitor
+
+Collects the Apache Connection status data over multicast.
+
+Example Configuration:
+  # First parameter is the interface to listen on
+  HeartbeatListen 239.0.0.1:27999
+  # Absolute path, or relative path to ServerRoot
+  HeartbeatStorage logs/hb.dat
+
+Dependencies:
+  Due to a bug in APR's apr_socket_recvfrom, version 1.2.12 or newer must be
+  used:
+    <http://svn.apache.org/viewvc?view=rev&revision=467600>
+
+Consuming:
+  This module atomically writes to the configured path, a list of servers, 
+  along with metadata about them.
+  
+  Included data about each server:
+    - IP Address
+    - Busy Slots
+    - Open Slots
+    - Last Seen
+
+  Every 5 seconds, this file will be updated with the current status of the 
+  cluster.
+
+  
+

Added: httpd/httpd/trunk/modules/cluster/config.m4
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/config.m4?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/config.m4 (added)
+++ httpd/httpd/trunk/modules/cluster/config.m4 Sun Nov 30 18:55:14 2008
@@ -0,0 +1,7 @@
+
+APACHE_MODPATH_INIT(cluster)
+
+APACHE_MODULE(heartbeat, Generates Heartbeats, , , most)
+APACHE_MODULE(heartmonitor, Collects Heartbeats, , , most)
+
+APACHE_MODPATH_FINISH

Added: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartbeat.c?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/mod_heartbeat.c (added)
+++ httpd/httpd/trunk/modules/cluster/mod_heartbeat.c Sun Nov 30 18:55:14 2008
@@ -0,0 +1,354 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "httpd.h"
+#include "http_config.h"
+#include "http_log.h"
+#include "apr_strings.h"
+
+#include "ap_mpm.h"
+#include "scoreboard.h"
+
+#ifndef HEARTBEAT_INTERVAL
+#define HEARTBEAT_INTERVAL (1)
+#endif
+
+module AP_MODULE_DECLARE_DATA heartbeat_module;
+
+typedef struct hb_ctx_t
+{
+    int active;
+    apr_sockaddr_t *mcast_addr;
+    int server_limit;
+    int thread_limit;
+    int status;
+    int keep_running;
+    apr_proc_mutex_t *mutex;
+    const char *mutex_path;
+    apr_thread_mutex_t *start_mtx;
+    apr_thread_t *thread;
+    apr_file_t *lockf;
+} hb_ctx_t;
+
+static const char *msg_format = "v=%u&ready=%u&busy=%u";
+
+#define MSG_VERSION (1)
+
+static int hb_monitor(hb_ctx_t *ctx, apr_pool_t *p)
+{
+    int i, j;
+    apr_uint32_t ready = 0;
+    apr_uint32_t busy = 0;
+
+    for (i = 0; i < ctx->server_limit; i++) {
+        process_score *ps;
+        ps = ap_get_scoreboard_process(i);
+
+        for (j = 0; j < ctx->thread_limit; j++) {
+            worker_score *ws = NULL;
+
+            ws = &ap_scoreboard_image->servers[i][j];
+
+            int res = ws->status;
+
+            if (res == SERVER_READY && ps->generation == ap_my_generation) {
+                ready++;
+            }
+            else if (res != SERVER_DEAD &&
+                     res != SERVER_STARTING && res != SERVER_IDLE_KILL) {
+                busy++;
+            }
+        }
+    }
+
+    char buf[256];
+    apr_size_t len =
+        apr_snprintf(buf, sizeof(buf), msg_format, MSG_VERSION, ready, busy);
+
+    apr_socket_t *sock = NULL;
+    do {
+        apr_status_t rv;
+        rv = apr_socket_create(&sock, ctx->mcast_addr->family,
+                               SOCK_DGRAM, APR_PROTO_UDP, p);
+        if (rv) {
+            ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
+                         NULL, "Heartbeat: apr_socket_create failed");
+            break;
+        }
+
+        rv = apr_mcast_loopback(sock, 1);
+        if (rv) {
+            ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
+                         NULL, "Heartbeat: apr_mcast_loopback failed");
+            break;
+        }
+
+        rv = apr_socket_sendto(sock, ctx->mcast_addr, 0, buf, &len);
+        if (rv) {
+            ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
+                         NULL, "Heartbeat: apr_socket_sendto failed");
+            break;
+        }
+    } while (0);
+
+    if (sock) {
+        apr_socket_close(sock);
+    }
+
+    return OK;
+}
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
+static void *hb_worker(apr_thread_t *thd, void *data)
+{
+    hb_ctx_t *ctx = (hb_ctx_t *) data;
+    apr_status_t rv;
+
+    apr_pool_t *pool = apr_thread_pool_get(thd);
+    apr_pool_tag(pool, "heartbeat_worker");
+    ctx->status = 0;
+    ctx->keep_running = 1;
+    apr_thread_mutex_unlock(ctx->start_mtx);
+
+    while (ctx->keep_running) {
+        rv = apr_proc_mutex_trylock(ctx->mutex);
+        if (rv == APR_SUCCESS) {
+            break;
+        }
+        apr_sleep(apr_time_from_msec(200));
+    }
+
+    while (ctx->keep_running) {
+        int mpm_state = 0;
+        rv = ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state);
+
+        if (rv != APR_SUCCESS) {
+            break;
+        }
+
+        if (mpm_state == AP_MPMQ_STOPPING) {
+            ctx->keep_running = 0;
+            break;
+        }
+
+        apr_pool_t *tpool;
+        apr_pool_create(&tpool, pool);
+        apr_pool_tag(tpool, "heartbeat_worker_temp");
+        hb_monitor(ctx, tpool);
+        apr_pool_destroy(tpool);
+        apr_sleep(apr_time_from_sec(HEARTBEAT_INTERVAL));
+    }
+
+    apr_proc_mutex_unlock(ctx->mutex);
+    apr_thread_exit(ctx->thread, APR_SUCCESS);
+
+    return NULL;
+}
+
+static apr_status_t hb_pool_cleanup(void *baton)
+{
+    apr_status_t rv;
+    hb_ctx_t *ctx = (hb_ctx_t *) baton;
+
+    ctx->keep_running = 0;
+
+    apr_thread_join(&rv, ctx->thread);
+
+    return rv;
+}
+
+static void start_hb_worker(apr_pool_t *p, hb_ctx_t *ctx)
+{
+    apr_status_t rv;
+
+    rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
+                                 p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartbeat: apr_thread_cond_create failed");
+        ctx->status = rv;
+        return;
+    }
+
+    apr_thread_mutex_lock(ctx->start_mtx);
+
+    apr_pool_cleanup_register(p, ctx, hb_pool_cleanup, apr_pool_cleanup_null);
+
+    rv = apr_thread_create(&ctx->thread, NULL, hb_worker, ctx, p);
+    if (rv) {
+        apr_pool_cleanup_kill(p, ctx, hb_pool_cleanup);
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartbeat: apr_thread_create failed");
+        ctx->status = rv;
+    }
+
+    apr_thread_mutex_lock(ctx->start_mtx);
+    apr_thread_mutex_unlock(ctx->start_mtx);
+    apr_thread_mutex_destroy(ctx->start_mtx);
+}
+
+static void hb_child_init(apr_pool_t *p, server_rec *s)
+{
+    hb_ctx_t *ctx = ap_get_module_config(s->module_config, &heartbeat_module);
+
+    apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p);
+
+    ctx->status = -1;
+
+    if (ctx->active) {
+        start_hb_worker(p, ctx);
+        if (ctx->status != 0) {
+            ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s,
+                         "Heartbeat: Failed to start worker thread.");
+            return;
+        }
+    }
+
+    return;
+}
+
+static int hb_init(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp,
+                   server_rec *s)
+{
+    apr_status_t rv;
+    hb_ctx_t *ctx = ap_get_module_config(s->module_config, &heartbeat_module);
+
+    ap_mpm_query(AP_MPMQ_HARD_LIMIT_THREADS, &ctx->thread_limit);
+    ap_mpm_query(AP_MPMQ_HARD_LIMIT_DAEMONS, &ctx->server_limit);
+
+    rv = apr_proc_mutex_create(&ctx->mutex, ctx->mutex_path,
+#if APR_HAS_FCNTL_SERIALIZE
+                               APR_LOCK_FCNTL,
+#else
+#if APR_HAS_FLOCK_SERIALIZE
+                               APR_LOCK_FLOCK,
+#else
+#error port me to a non crap platform.
+#endif
+#endif
+                               p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
+                     "Heartbeat: mutex failed creation at %s (type=%s)",
+                     ctx->mutex_path, apr_proc_mutex_defname());
+        return !OK;
+    }
+
+    return OK;
+}
+
+static void hb_register_hooks(apr_pool_t *p)
+{
+    ap_hook_post_config(hb_init, NULL, NULL, APR_HOOK_MIDDLE);
+    ap_hook_child_init(hb_child_init, NULL, NULL, APR_HOOK_MIDDLE);
+}
+
+static void *hb_create_config(apr_pool_t *p, server_rec *s)
+{
+    hb_ctx_t *cfg = (hb_ctx_t *) apr_palloc(p, sizeof(hb_ctx_t));
+
+    cfg->active = 0;
+    cfg->thread_limit = 0;
+    cfg->server_limit = 0;
+
+    return cfg;
+}
+
+static const char *cmd_hb_address(cmd_parms *cmd,
+                                  void *dconf, const char *addr)
+{
+    apr_status_t rv;
+    char *host_str;
+    char *scope_id;
+    apr_port_t port = 0;
+    apr_pool_t *p = cmd->pool;
+    hb_ctx_t *ctx =
+        (hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
+                                          &heartbeat_module);
+    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+    if (err != NULL) {
+        return err;
+    }
+
+    ctx->active = 1;
+
+    rv = apr_parse_addr_port(&host_str, &scope_id, &port, addr, p);
+
+    if (rv) {
+        return "HeartbeatAddress: Unable to parse address.";
+    }
+
+    if (host_str == NULL) {
+        return "HeartbeatAddress: No host provided in address";
+    }
+
+    if (port == 0) {
+        return "HeartbeatAddress: No port provided in address";
+    }
+
+    rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
+                               p);
+
+    if (rv) {
+        return "HeartbeatAddress: apr_sockaddr_info_get failed.";
+    }
+
+    const char *tmpdir = NULL;
+    rv = apr_temp_dir_get(&tmpdir, p);
+    if (rv) {
+        return "HeartbeatAddress: unable to find temp directory.";
+    }
+
+    char *path = apr_pstrcat(p, tmpdir, "/hb-tmp.XXXXXX", NULL);
+
+    rv = apr_file_mktemp(&ctx->lockf, path, 0, p);
+
+    if (rv) {
+        return "HeartbeatAddress: unable to allocate temp file.";
+    }
+
+    rv = apr_file_name_get(&ctx->mutex_path, ctx->lockf);
+
+    if (rv) {
+        return "HeartbeatAddress: unable to get lockf name.";
+    }
+
+    apr_file_close(ctx->lockf);
+
+    return NULL;
+}
+
+static const command_rec hb_cmds[] = {
+    AP_INIT_TAKE1("HeartbeatAddress", cmd_hb_address, NULL, RSRC_CONF,
+                  "Address to send heartbeat requests"),
+    {NULL}
+};
+
+module AP_MODULE_DECLARE_DATA heartbeat_module = {
+    STANDARD20_MODULE_STUFF,
+    NULL,                       /* create per-directory config structure */
+    NULL,                       /* merge per-directory config structures */
+    hb_create_config,           /* create per-server config structure */
+    NULL,                       /* merge per-server config structures */
+    hb_cmds,                    /* command apr_table_t */
+    hb_register_hooks
+};

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
------------------------------------------------------------------------------
    svn:keywords = Date Revision Author HeadURL Id

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c?rev=721952&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c (added)
+++ httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c Sun Nov 30 18:55:14 2008
@@ -0,0 +1,551 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "httpd.h"
+#include "http_config.h"
+#include "http_log.h"
+#include "apr_strings.h"
+#include "apr_hash.h"
+#include "ap_mpm.h"
+#include "scoreboard.h"
+
+module AP_MODULE_DECLARE_DATA heartmonitor_module;
+
+typedef struct hm_server_t
+{
+    const char *ip;
+    int busy;
+    int ready;
+    apr_time_t seen;
+} hm_server_t;
+
+typedef struct hm_ctx_t
+{
+    int active;
+    const char *storage_path;
+    apr_proc_mutex_t *mutex;
+    const char *mutex_path;
+    apr_sockaddr_t *mcast_addr;
+    int status;
+    int keep_running;
+    apr_thread_mutex_t *start_mtx;
+    apr_thread_t *thread;
+    apr_socket_t *sock;
+    apr_pool_t *p;
+    apr_hash_t *servers;
+} hm_ctx_t;
+
+static apr_status_t hm_listen(hm_ctx_t *ctx)
+{
+    apr_status_t rv;
+
+    rv = apr_socket_create(&ctx->sock, ctx->mcast_addr->family,
+                           SOCK_DGRAM, APR_PROTO_UDP, ctx->p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to create listening socket.");
+        return rv;
+    }
+
+    rv = apr_socket_opt_set(ctx->sock, APR_SO_REUSEADDR, 1);
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to set APR_SO_REUSEADDR to 1 on socket.");
+        return rv;
+    }
+
+
+    rv = apr_socket_opt_set(ctx->sock, APR_SO_NONBLOCK, 1);
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to set APR_SO_REUSEADDR to 1 on socket.");
+        return rv;
+    }
+
+    rv = apr_socket_bind(ctx->sock, ctx->mcast_addr);
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to bind on socket.");
+        return rv;
+    }
+
+    rv = apr_mcast_join(ctx->sock, ctx->mcast_addr, NULL, NULL);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to join multicast group");
+        return rv;
+    }
+
+    rv = apr_mcast_loopback(ctx->sock, 1);
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Failed to accept localhost mulitcast on socket.");
+        return rv;
+    }
+
+    ctx->servers = apr_hash_make(ctx->p);
+
+    return APR_SUCCESS;
+}
+
+static void qs_to_table(const char *input, apr_table_t *parms,
+                        apr_pool_t *p)
+{
+    char *key;
+    char *value;
+    char *query_string;
+    char *strtok_state;
+
+    if (input == NULL) {
+        return;
+    }
+
+    query_string = apr_pstrdup(p, input);
+
+    key = apr_strtok(query_string, "&", &strtok_state);
+    while (key) {
+        value = strchr(key, '=');
+        if (value) {
+            *value = '\0';      /* Split the string in two */
+            value++;            /* Skip passed the = */
+        }
+        else {
+            value = "1";
+        }
+        ap_unescape_url(key);
+        ap_unescape_url(value);
+        apr_table_set(parms, key, value);
+        /*
+           ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+           "Found query arg: %s = %s", key, value);
+         */
+        key = apr_strtok(NULL, "&", &strtok_state);
+    }
+}
+
+
+#define SEEN_TIMEOUT (30)
+
+static apr_status_t hm_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
+{
+    apr_status_t rv;
+    apr_file_t *fp;
+    char *path = apr_pstrcat(p, ctx->storage_path, ".tmp.XXXXXX", NULL);
+    /* TODO: Update stats file (!) */
+    rv = apr_file_mktemp(&fp, path, APR_CREATE | APR_WRITE, p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Unable to open tmp file: %s", path);
+        return rv;
+    }
+
+    apr_hash_index_t *hi;
+    apr_time_t now = apr_time_now();
+    for (hi = apr_hash_first(p, ctx->servers);
+         hi != NULL; hi = apr_hash_next(hi)) {
+        hm_server_t *s = NULL;
+        apr_hash_this(hi, NULL, NULL, (void **) &s);
+        apr_uint32_t seen = apr_time_sec(now - s->seen);
+        if (seen > SEEN_TIMEOUT) {
+            /*
+             * Skip this entry from the heartbeat file -- when it comes back,
+             * we will reuse the memory...
+             */
+        }
+        else {
+            apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u\n",
+                            s->ip, s->ready, s->busy, seen);
+        }
+    }
+
+    apr_file_close(fp);
+
+    rv = apr_file_perms_set(path,
+                            APR_FPROT_UREAD | APR_FPROT_GREAD |
+                            APR_FPROT_WREAD);
+    if (rv && rv != APR_INCOMPLETE) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Unable to set file permssions on %s",
+                     path);
+        return rv;
+    }
+
+    rv = apr_file_rename(path, ctx->storage_path, p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Unable to move file: %s -> %s", path,
+                     ctx->storage_path);
+        return rv;
+    }
+
+    return APR_SUCCESS;
+}
+
+static hm_server_t *hm_get_server(hm_ctx_t *ctx, const char *ip)
+{
+    hm_server_t *s;
+
+    s = apr_hash_get(ctx->servers, ip, APR_HASH_KEY_STRING);
+
+    if (s == NULL) {
+        s = apr_palloc(ctx->p, sizeof(hm_server_t));
+        s->ip = apr_pstrdup(ctx->p, ip);
+        s->ready = 0;
+        s->busy = 0;
+        s->seen = 0;
+        apr_hash_set(ctx->servers, s->ip, APR_HASH_KEY_STRING, s);
+    }
+
+    return s;
+}
+
+#define MAX_MSG_LEN (1000)
+static apr_status_t hm_recv(hm_ctx_t *ctx, apr_pool_t *p)
+{
+    char buf[MAX_MSG_LEN + 1];
+    apr_sockaddr_t from;
+    from.pool = p;
+    apr_size_t len = MAX_MSG_LEN;
+    apr_status_t rv;
+
+    rv = apr_socket_recvfrom(&from, ctx->sock, 0, buf, &len);
+
+    if (APR_STATUS_IS_EAGAIN(rv)) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: would block");
+        return APR_SUCCESS;
+    }
+    else if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: recvfrom failed");
+        return rv;
+    }
+
+    buf[len] = '\0';
+
+    apr_table_t *tbl;
+
+    tbl = apr_table_make(p, 10);
+
+    qs_to_table(buf, tbl, p);
+
+    if (apr_table_get(tbl, "v") != NULL &&
+        apr_table_get(tbl, "busy") != NULL &&
+        apr_table_get(tbl, "ready") != NULL) {
+        char *ip;
+        /* TODO: REMOVE ME BEFORE PRODUCTION (????) */
+        ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, NULL,
+                     "Heartmonitor: %pI busy=%s ready=%s", &from,
+                     apr_table_get(tbl, "busy"), apr_table_get(tbl, "ready"));
+
+        apr_sockaddr_ip_get(&ip, &from);
+
+        hm_server_t *s = hm_get_server(ctx, ip);
+
+        s->busy = atoi(apr_table_get(tbl, "busy"));
+        s->ready = atoi(apr_table_get(tbl, "ready"));
+        s->seen = apr_time_now();
+    }
+    else {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: malformed multicast message from %pI",
+                     &from);
+    }
+
+    return rv;
+}
+
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
+static void *hm_worker(apr_thread_t *thd, void *data)
+{
+    hm_ctx_t *ctx = (hm_ctx_t *) data;
+    apr_status_t rv;
+
+    ctx->p = apr_thread_pool_get(thd);
+    ctx->status = 0;
+    ctx->keep_running = 1;
+    apr_thread_mutex_unlock(ctx->start_mtx);
+
+    while (ctx->keep_running) {
+        rv = apr_proc_mutex_trylock(ctx->mutex);
+        if (rv == APR_SUCCESS) {
+            break;
+        }
+        apr_sleep(apr_time_from_msec(200));
+    }
+
+    rv = hm_listen(ctx);
+
+    if (rv) {
+        ctx->status = rv;
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: Unable to listen for connections!");
+        apr_proc_mutex_unlock(ctx->mutex);
+        apr_thread_exit(ctx->thread, rv);
+        return NULL;
+    }
+
+
+    apr_time_t last = apr_time_now();
+    while (ctx->keep_running) {
+        int n;
+        apr_pool_t *p;
+        apr_pollfd_t pfd;
+        apr_interval_time_t timeout;
+        apr_pool_create(&p, ctx->p);
+
+        apr_time_t now = apr_time_now();
+
+        if (apr_time_sec((now - last)) > 5) {
+            hm_update_stats(ctx, p);
+            apr_pool_clear(p);
+            last = now;
+        }
+
+        pfd.desc_type = APR_POLL_SOCKET;
+        pfd.desc.s = ctx->sock;
+        pfd.p = p;
+        pfd.reqevents = APR_POLLIN;
+
+        timeout = apr_time_from_sec(1);
+
+        rv = apr_poll(&pfd, 1, &n, timeout);
+
+        if (!ctx->keep_running) {
+            break;
+        }
+
+        if (rv) {
+            apr_pool_destroy(p);
+            continue;
+        }
+
+        if (pfd.rtnevents & APR_POLLIN) {
+            hm_recv(ctx, p);
+        }
+
+        apr_pool_destroy(p);
+    }
+
+    apr_proc_mutex_unlock(ctx->mutex);
+    apr_thread_exit(ctx->thread, APR_SUCCESS);
+
+    return NULL;
+}
+
+static apr_status_t hm_pool_cleanup(void *baton)
+{
+    apr_status_t rv;
+    hm_ctx_t *ctx = (hm_ctx_t *) baton;
+
+    ctx->keep_running = 0;
+
+    apr_thread_join(&rv, ctx->thread);
+
+    return rv;
+}
+
+static void start_hm_worker(apr_pool_t *p, hm_ctx_t *ctx)
+{
+    apr_status_t rv;
+
+    rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
+                                 p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: apr_thread_cond_create failed");
+        ctx->status = rv;
+        return;
+    }
+
+    apr_thread_mutex_lock(ctx->start_mtx);
+
+    apr_pool_cleanup_register(p, ctx, hm_pool_cleanup, apr_pool_cleanup_null);
+
+    rv = apr_thread_create(&ctx->thread, NULL, hm_worker, ctx, p);
+    if (rv) {
+        apr_pool_cleanup_kill(p, ctx, hm_pool_cleanup);
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+                     "Heartmonitor: apr_thread_create failed");
+        ctx->status = rv;
+    }
+
+    apr_thread_mutex_lock(ctx->start_mtx);
+    apr_thread_mutex_unlock(ctx->start_mtx);
+    apr_thread_mutex_destroy(ctx->start_mtx);
+}
+
+static void hm_child_init(apr_pool_t *p, server_rec *s)
+{
+    hm_ctx_t *ctx =
+        ap_get_module_config(s->module_config, &heartmonitor_module);
+
+    apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p);
+
+    ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s,
+                 "Heartmonitor: Starting Listener Thread. mcast=%pI",
+                 ctx->mcast_addr);
+
+    ctx->status = -1;
+
+    start_hm_worker(p, ctx);
+
+    if (ctx->status != 0) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s,
+                     "Heartmonitor: Failed to start listener thread.");
+        return;
+    }
+
+    return;
+}
+
+static int hm_post_config(apr_pool_t *p, apr_pool_t *plog,
+                          apr_pool_t *ptemp, server_rec *s)
+{
+    hm_ctx_t *ctx = ap_get_module_config(s->module_config,
+                                         &heartmonitor_module);
+
+    apr_status_t rv = apr_proc_mutex_create(&ctx->mutex,
+                                            ctx->mutex_path,
+#if APR_HAS_FCNTL_SERIALIZE
+
+                                            APR_LOCK_FCNTL,
+#else
+#if APR_HAS_FLOCK_SERIALIZE
+                                            APR_LOCK_FLOCK,
+#else
+#error port me to a non crap platform.
+#endif
+#endif
+                                            p);
+
+    if (rv) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
+                     "Heartmonitor: Failed to create listener "
+                     "mutex at %s (type=%s)", ctx->mutex_path,
+                     apr_proc_mutex_defname());
+        return !OK;
+    }
+
+    return OK;
+}
+
+static void hm_register_hooks(apr_pool_t *p)
+{
+    ap_hook_post_config(hm_post_config, NULL, NULL, APR_HOOK_MIDDLE);
+    ap_hook_child_init(hm_child_init, NULL, NULL, APR_HOOK_MIDDLE);
+}
+
+static void *hm_create_config(apr_pool_t *p, server_rec *s)
+{
+    hm_ctx_t *ctx = (hm_ctx_t *) apr_palloc(p, sizeof(hm_ctx_t));
+
+    ctx->active = 0;
+    ctx->storage_path = ap_server_root_relative(p, "logs/hb.dat");
+
+    return ctx;
+}
+
+static const char *cmd_hm_storage(cmd_parms *cmd,
+                                  void *dconf, const char *path)
+{
+    apr_pool_t *p = cmd->pool;
+    hm_ctx_t *ctx =
+        (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
+                                          &heartmonitor_module);
+    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+    if (err != NULL) {
+        return err;
+    }
+
+    ctx->storage_path = ap_server_root_relative(p, path);
+    ctx->mutex_path =
+        ap_server_root_relative(p, apr_pstrcat(p, path, ".hm-lock", NULL));
+
+    return NULL;
+}
+
+static const char *cmd_hm_listen(cmd_parms *cmd,
+                                 void *dconf, const char *mcast_addr)
+{
+    apr_status_t rv;
+    char *host_str;
+    char *scope_id;
+    apr_port_t port = 0;
+    apr_pool_t *p = cmd->pool;
+    hm_ctx_t *ctx =
+        (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
+                                          &heartmonitor_module);
+    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+    if (err != NULL) {
+        return err;
+    }
+
+    ctx->active = 1;
+
+    rv = apr_parse_addr_port(&host_str, &scope_id, &port, mcast_addr, p);
+
+    if (rv) {
+        return "HeartbeatListen: Unable to parse multicast address.";
+    }
+
+    if (host_str == NULL) {
+        return "HeartbeatListen: No host provided in multicast address";
+    }
+
+    if (port == 0) {
+        return "HeartbeatListen: No port provided in multicast address";
+    }
+
+    rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
+                               p);
+
+    if (rv) {
+        return
+            "HeartbeatListen: apr_sockaddr_info_get failed on multicast address";
+    }
+
+    return NULL;
+}
+
+static const command_rec hm_cmds[] = {
+    AP_INIT_TAKE1("HeartbeatListen", cmd_hm_listen, NULL, RSRC_CONF,
+                  "Address to listen for heartbeat requests"),
+    AP_INIT_TAKE1("HeartbeatStorage", cmd_hm_storage, NULL, RSRC_CONF,
+                  "Path to store heartbeat data."),
+    {NULL}
+};
+
+module AP_MODULE_DECLARE_DATA heartmonitor_module = {
+    STANDARD20_MODULE_STUFF,
+    NULL,                       /* create per-directory config structure */
+    NULL,                       /* merge per-directory config structures */
+    hm_create_config,           /* create per-server config structure */
+    NULL,                       /* merge per-server config structures */
+    hm_cmds,                    /* command apr_table_t */
+    hm_register_hooks
+};

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
------------------------------------------------------------------------------
    svn:keywords = Date Revision Author HeadURL Id

Propchange: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Re: svn commit: r721952 - in /httpd/httpd/trunk: ./ modules/ modules/cluster/

Posted by Rainer Jung <ra...@kippdata.de>.
Jim Jagielski schrieb:
> 
> On Dec 5, 2008, at 4:21 AM, Paul Querna wrote:
> 
>>> Is there any reason why we must use either APR_LOCK_FCNTL or
>>> APR_LOCK_FLOCK,
>>> wouldn't the default mutex work?
>>
>> The default lock mech on OSX is sysvsem.  I couldn't get it to work
>> properly after forking at all.
>>
>> Maybe I was doing something wrong, but switching it to flock/fcntl
>> works pretty much everywhere, and pretty consistently works even if a
>> child crashes.
> 
> This brings up something else I'h hoping to add in: the ability to
> change what the "default" mutex is. Many times the compile-time default
> isn't right and there are loads of places we simply use the default.
> Having a way within httpd/apr which overrules the default would be
> quite useful...

+1

Re: svn commit: r721952 - in /httpd/httpd/trunk: ./ modules/ modules/cluster/

Posted by Jim Jagielski <ji...@jaguNET.com>.
On Dec 5, 2008, at 4:21 AM, Paul Querna wrote:

>> Is there any reason why we must use either APR_LOCK_FCNTL or  
>> APR_LOCK_FLOCK,
>> wouldn't the default mutex work?
>
> The default lock mech on OSX is sysvsem.  I couldn't get it to work  
> properly after forking at all.
>
> Maybe I was doing something wrong, but switching it to flock/fcntl  
> works pretty much everywhere, and pretty consistently works even if  
> a child crashes.

This brings up something else I'h hoping to add in: the ability to
change what the "default" mutex is. Many times the compile-time default
isn't right and there are loads of places we simply use the default.
Having a way within httpd/apr which overrules the default would be
quite useful...

It's on my TODO 

Re: svn commit: r721952 - in /httpd/httpd/trunk: ./ modules/ modules/cluster/

Posted by Paul Querna <ch...@force-elite.com>.
Ruediger Pluem wrote:
....
>> +typedef struct hb_ctx_t
>> +{
>> +    int active;
>> +    apr_sockaddr_t *mcast_addr;
>> +    int server_limit;
>> +    int thread_limit;
>> +    int status;
>> +    int keep_running;
> 
> Shouldn't this be volatile?

Changed, r723660.

....
>> +            if (res == SERVER_READY && ps->generation == ap_my_generation) {
>> +                ready++;
>> +            }
>> +            else if (res != SERVER_DEAD &&
>> +                     res != SERVER_STARTING && res != SERVER_IDLE_KILL) {
>> +                busy++;
> 
> Is this correct even if ps->generation != ap_my_generation?

Nope, this would over-report 'busy' servers.  Fixed in r723661.
....
>> +
>> +        apr_pool_t *tpool;
>> +        apr_pool_create(&tpool, pool);
>> +        apr_pool_tag(tpool, "heartbeat_worker_temp");
>> +        hb_monitor(ctx, tpool);
>> +        apr_pool_destroy(tpool);
> 
> Why create / destroy and not simply create once and call apr_pool_clear
> in the loop?

As this pool is around forever, but this is only ran every second, I 
don't think there is much advantage to clearing it at the small risk it 
keeps growing.

....
>> +static void start_hb_worker(apr_pool_t *p, hb_ctx_t *ctx)
>> +{
>> +    apr_status_t rv;
>> +
>> +    rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
>> +                                 p);
>> +
>> +    if (rv) {
>> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
>> +                     "Heartbeat: apr_thread_cond_create failed");
> 
> You create a thread mutex above, not a thread cond.

Yeah, some old code used a thread cond for this, fixed in r723663.

....
>> +    rv = apr_thread_create(&ctx->thread, NULL, hb_worker, ctx, p);
>> +    if (rv) {
>> +        apr_pool_cleanup_kill(p, ctx, hb_pool_cleanup);
>> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
>> +                     "Heartbeat: apr_thread_create failed");
>> +        ctx->status = rv;
>> +    }
>> +
>> +    apr_thread_mutex_lock(ctx->start_mtx);
>> +    apr_thread_mutex_unlock(ctx->start_mtx);
> 
> This may deserve some comment. As far as I understand the desire is to wait until the
> hb_worker thread is up.
> But to be honest I do not understand the need for the start_mutex at all.

Added a comment in r723665.

>> +    rv = apr_proc_mutex_create(&ctx->mutex, ctx->mutex_path,
>> +#if APR_HAS_FCNTL_SERIALIZE
>> +                               APR_LOCK_FCNTL,
>> +#else
>> +#if APR_HAS_FLOCK_SERIALIZE
>> +                               APR_LOCK_FLOCK,
>> +#else
>> +#error port me to a non crap platform.
>> +#endif
>> +#endif
>> +                               p);
> 
> Is there any reason why we must use either APR_LOCK_FCNTL or APR_LOCK_FLOCK,
> wouldn't the default mutex work?

The default lock mech on OSX is sysvsem.  I couldn't get it to work 
properly after forking at all.

Maybe I was doing something wrong, but switching it to flock/fcntl works 
pretty much everywhere, and pretty consistently works even if a child 
crashes.

>> +
>> +    if (rv) {
>> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
>> +                     "Heartbeat: mutex failed creation at %s (type=%s)",
>> +                     ctx->mutex_path, apr_proc_mutex_defname());
> 
> And how do you know that apr_proc_mutex_defname is either APR_LOCK_FCNTL
> or APR_LOCK_FLOCK? Maybe the default mutex on this platform is something
> different.

Fixed with r723666.

>> Added: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
.....
>> +typedef struct hm_ctx_t
>> +{
>> +    int active;
>> +    const char *storage_path;
>> +    apr_proc_mutex_t *mutex;
>> +    const char *mutex_path;
>> +    apr_sockaddr_t *mcast_addr;
>> +    int status;
>> +    int keep_running;
> 
> Shouldn't this be volatile?

Fixed in r723669.


>> +    apr_time_t last = apr_time_now();
>> +    while (ctx->keep_running) {
>> +        int n;
>> +        apr_pool_t *p;
>> +        apr_pollfd_t pfd;
>> +        apr_interval_time_t timeout;
>> +        apr_pool_create(&p, ctx->p);
>> +
>> +        apr_time_t now = apr_time_now();
>> +
>> +        if (apr_time_sec((now - last)) > 5) {
> 
> Hardcoded 5 seconds? Bah!!

Moved to a compile time define in r723672.

....
>> +
>> +        apr_pool_destroy(p);
> 
> Why not just clearing the pool?

Because I don't trust pools ? :P

....
>> +    rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
>> +                                 p);
>> +
>> +    if (rv) {
>> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
>> +                     "Heartmonitor: apr_thread_cond_create failed");
> 
> You create a thread mutex above, not a thread cond.

Fixed in r723674.

....
>> +
>> +    apr_thread_mutex_lock(ctx->start_mtx);
>> +    apr_thread_mutex_unlock(ctx->start_mtx);
> 
> This may deserve some comment. As far as I understand the desire is to wait until the
> hb_worker thread is up.
> But to be honest I do not understand the need for the start_mutex at all.

Commented in r723675.

...
>> +#if APR_HAS_FCNTL_SERIALIZE
>> +
>> +                                            APR_LOCK_FCNTL,
>> +#else
>> +#if APR_HAS_FLOCK_SERIALIZE
>> +                                            APR_LOCK_FLOCK,
>> +#else
>> +#error port me to a non crap platform.
>> +#endif
>> +#endif
>> +                                            p);
> 
> Is there any reason why we must use either APR_LOCK_FCNTL or APR_LOCK_FLOCK,
> wouldn't the default mutex work?

See comments above from mod_heartbeat.

>> +
>> +    if (rv) {
>> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
>> +                     "Heartmonitor: Failed to create listener "
>> +                     "mutex at %s (type=%s)", ctx->mutex_path,
>> +                     apr_proc_mutex_defname());
> 
> And how do you know that apr_proc_mutex_defname is either APR_LOCK_FCNTL
> or APR_LOCK_FLOCK? Maybe the default mutex on this platform is something
> different.
> 

Fixed in r723677.

...
>> +static void *hm_create_config(apr_pool_t *p, server_rec *s)
>> +{
>> +    hm_ctx_t *ctx = (hm_ctx_t *) apr_palloc(p, sizeof(hm_ctx_t));
>> +
>> +    ctx->active = 0;
>> +    ctx->storage_path = ap_server_root_relative(p, "logs/hb.dat");
> 
> Why doesn't ctx->mutex_path get initialized here?

Fixed in r723679

...
> 
> 
> Regards

THANK YOU very much for reviewing the modules, it is very appreciated!

-Paul

Re: svn commit: r721952 - in /httpd/httpd/trunk: ./ modules/ modules/cluster/

Posted by Ruediger Pluem <rp...@apache.org>.

On 12/01/2008 03:55 AM, pquerna@apache.org wrote:
> Author: pquerna
> Date: Sun Nov 30 18:55:14 2008
> New Revision: 721952
> 
> URL: http://svn.apache.org/viewvc?rev=721952&view=rev
> Log:
> Add two new modules, originally written at Joost, to handle load balancing across
> multiple apache servers within the same datacenter.
> 
> mod_heartbeat generates multicast status messages with the current number of 
> clients connected, but the formated can easily be extended to include other things.
> 
> mod_heartmonitor collects these messages into a static file, which then can be 
> used for other modules to make load balancing decisions on.
> 
> Added:
>     httpd/httpd/trunk/modules/cluster/   (with props)
>     httpd/httpd/trunk/modules/cluster/Makefile.in   (with props)
>     httpd/httpd/trunk/modules/cluster/README.heartbeat
>     httpd/httpd/trunk/modules/cluster/README.heartmonitor
>     httpd/httpd/trunk/modules/cluster/config.m4
>     httpd/httpd/trunk/modules/cluster/mod_heartbeat.c   (with props)
>     httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c   (with props)
> Modified:
>     httpd/httpd/trunk/CHANGES
>     httpd/httpd/trunk/modules/README
> 
> Modified: httpd/httpd/trunk/CHANGES
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=721952&r1=721951&r2=721952&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/CHANGES [utf-8] (original)
> +++ httpd/httpd/trunk/CHANGES [utf-8] Sun Nov 30 18:55:14 2008
> @@ -2,6 +2,12 @@
>  Changes with Apache 2.3.0
>  [ When backported to 2.2.x, remove entry from this file ]
>  
> +  *) mod_heartmonitor: New module to collect heartbeats, and write out a file
> +     so that other modules can load balance traffic as needed. [Paul Querna]
> +
> +  *) mod_heartbeat: New module to genarate multicast heartbeats to konw if a 
> +     server is online. [Paul Querna]
> +

s/konw/know/

In addition to the later adjusted svn log message I would propose that you add
Sanders and Justins name to this change entry as well.


> Added: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartbeat.c?rev=721952&view=auto
> ==============================================================================
> --- httpd/httpd/trunk/modules/cluster/mod_heartbeat.c (added)
> +++ httpd/httpd/trunk/modules/cluster/mod_heartbeat.c Sun Nov 30 18:55:14 2008
> @@ -0,0 +1,354 @@
> +/* Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include "httpd.h"
> +#include "http_config.h"
> +#include "http_log.h"
> +#include "apr_strings.h"
> +
> +#include "ap_mpm.h"
> +#include "scoreboard.h"
> +
> +#ifndef HEARTBEAT_INTERVAL
> +#define HEARTBEAT_INTERVAL (1)
> +#endif
> +
> +module AP_MODULE_DECLARE_DATA heartbeat_module;
> +
> +typedef struct hb_ctx_t
> +{
> +    int active;
> +    apr_sockaddr_t *mcast_addr;
> +    int server_limit;
> +    int thread_limit;
> +    int status;
> +    int keep_running;

Shouldn't this be volatile?

> +    apr_proc_mutex_t *mutex;
> +    const char *mutex_path;
> +    apr_thread_mutex_t *start_mtx;
> +    apr_thread_t *thread;
> +    apr_file_t *lockf;
> +} hb_ctx_t;
> +
> +static const char *msg_format = "v=%u&ready=%u&busy=%u";
> +
> +#define MSG_VERSION (1)
> +
> +static int hb_monitor(hb_ctx_t *ctx, apr_pool_t *p)
> +{
> +    int i, j;
> +    apr_uint32_t ready = 0;
> +    apr_uint32_t busy = 0;
> +
> +    for (i = 0; i < ctx->server_limit; i++) {
> +        process_score *ps;
> +        ps = ap_get_scoreboard_process(i);
> +
> +        for (j = 0; j < ctx->thread_limit; j++) {
> +            worker_score *ws = NULL;
> +
> +            ws = &ap_scoreboard_image->servers[i][j];
> +
> +            int res = ws->status;
> +
> +            if (res == SERVER_READY && ps->generation == ap_my_generation) {
> +                ready++;
> +            }
> +            else if (res != SERVER_DEAD &&
> +                     res != SERVER_STARTING && res != SERVER_IDLE_KILL) {
> +                busy++;

Is this correct even if ps->generation != ap_my_generation?

> +            }
> +        }
> +    }
> +
> +    char buf[256];
> +    apr_size_t len =
> +        apr_snprintf(buf, sizeof(buf), msg_format, MSG_VERSION, ready, busy);
> +
> +    apr_socket_t *sock = NULL;
> +    do {
> +        apr_status_t rv;
> +        rv = apr_socket_create(&sock, ctx->mcast_addr->family,
> +                               SOCK_DGRAM, APR_PROTO_UDP, p);
> +        if (rv) {
> +            ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
> +                         NULL, "Heartbeat: apr_socket_create failed");
> +            break;
> +        }
> +
> +        rv = apr_mcast_loopback(sock, 1);
> +        if (rv) {
> +            ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
> +                         NULL, "Heartbeat: apr_mcast_loopback failed");
> +            break;
> +        }
> +
> +        rv = apr_socket_sendto(sock, ctx->mcast_addr, 0, buf, &len);
> +        if (rv) {
> +            ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
> +                         NULL, "Heartbeat: apr_socket_sendto failed");
> +            break;
> +        }
> +    } while (0);
> +
> +    if (sock) {
> +        apr_socket_close(sock);
> +    }
> +
> +    return OK;
> +}
> +
> +#ifndef apr_time_from_msec
> +#define apr_time_from_msec(x) (x * 1000)
> +#endif
> +
> +static void *hb_worker(apr_thread_t *thd, void *data)
> +{
> +    hb_ctx_t *ctx = (hb_ctx_t *) data;
> +    apr_status_t rv;
> +
> +    apr_pool_t *pool = apr_thread_pool_get(thd);
> +    apr_pool_tag(pool, "heartbeat_worker");
> +    ctx->status = 0;
> +    ctx->keep_running = 1;
> +    apr_thread_mutex_unlock(ctx->start_mtx);
> +
> +    while (ctx->keep_running) {
> +        rv = apr_proc_mutex_trylock(ctx->mutex);
> +        if (rv == APR_SUCCESS) {
> +            break;
> +        }
> +        apr_sleep(apr_time_from_msec(200));
> +    }
> +
> +    while (ctx->keep_running) {
> +        int mpm_state = 0;
> +        rv = ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state);
> +
> +        if (rv != APR_SUCCESS) {
> +            break;
> +        }
> +
> +        if (mpm_state == AP_MPMQ_STOPPING) {
> +            ctx->keep_running = 0;
> +            break;
> +        }
> +
> +        apr_pool_t *tpool;
> +        apr_pool_create(&tpool, pool);
> +        apr_pool_tag(tpool, "heartbeat_worker_temp");
> +        hb_monitor(ctx, tpool);
> +        apr_pool_destroy(tpool);

Why create / destroy and not simply create once and call apr_pool_clear
in the loop?

> +        apr_sleep(apr_time_from_sec(HEARTBEAT_INTERVAL));
> +    }
> +
> +    apr_proc_mutex_unlock(ctx->mutex);
> +    apr_thread_exit(ctx->thread, APR_SUCCESS);
> +
> +    return NULL;
> +}
> +
> +static apr_status_t hb_pool_cleanup(void *baton)
> +{
> +    apr_status_t rv;
> +    hb_ctx_t *ctx = (hb_ctx_t *) baton;
> +
> +    ctx->keep_running = 0;
> +
> +    apr_thread_join(&rv, ctx->thread);
> +
> +    return rv;
> +}
> +
> +static void start_hb_worker(apr_pool_t *p, hb_ctx_t *ctx)
> +{
> +    apr_status_t rv;
> +
> +    rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
> +                                 p);
> +
> +    if (rv) {
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
> +                     "Heartbeat: apr_thread_cond_create failed");

You create a thread mutex above, not a thread cond.

> +        ctx->status = rv;
> +        return;
> +    }
> +
> +    apr_thread_mutex_lock(ctx->start_mtx);
> +
> +    apr_pool_cleanup_register(p, ctx, hb_pool_cleanup, apr_pool_cleanup_null);
> +
> +    rv = apr_thread_create(&ctx->thread, NULL, hb_worker, ctx, p);
> +    if (rv) {
> +        apr_pool_cleanup_kill(p, ctx, hb_pool_cleanup);
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
> +                     "Heartbeat: apr_thread_create failed");
> +        ctx->status = rv;
> +    }
> +
> +    apr_thread_mutex_lock(ctx->start_mtx);
> +    apr_thread_mutex_unlock(ctx->start_mtx);

This may deserve some comment. As far as I understand the desire is to wait until the
hb_worker thread is up.
But to be honest I do not understand the need for the start_mutex at all.

> +    apr_thread_mutex_destroy(ctx->start_mtx);
> +}
> +
> +static void hb_child_init(apr_pool_t *p, server_rec *s)
> +{
> +    hb_ctx_t *ctx = ap_get_module_config(s->module_config, &heartbeat_module);
> +
> +    apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p);
> +
> +    ctx->status = -1;
> +
> +    if (ctx->active) {
> +        start_hb_worker(p, ctx);
> +        if (ctx->status != 0) {
> +            ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s,
> +                         "Heartbeat: Failed to start worker thread.");
> +            return;
> +        }
> +    }
> +
> +    return;
> +}
> +
> +static int hb_init(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp,
> +                   server_rec *s)
> +{
> +    apr_status_t rv;
> +    hb_ctx_t *ctx = ap_get_module_config(s->module_config, &heartbeat_module);
> +
> +    ap_mpm_query(AP_MPMQ_HARD_LIMIT_THREADS, &ctx->thread_limit);
> +    ap_mpm_query(AP_MPMQ_HARD_LIMIT_DAEMONS, &ctx->server_limit);
> +
> +    rv = apr_proc_mutex_create(&ctx->mutex, ctx->mutex_path,
> +#if APR_HAS_FCNTL_SERIALIZE
> +                               APR_LOCK_FCNTL,
> +#else
> +#if APR_HAS_FLOCK_SERIALIZE
> +                               APR_LOCK_FLOCK,
> +#else
> +#error port me to a non crap platform.
> +#endif
> +#endif
> +                               p);

Is there any reason why we must use either APR_LOCK_FCNTL or APR_LOCK_FLOCK,
wouldn't the default mutex work?

> +
> +    if (rv) {
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
> +                     "Heartbeat: mutex failed creation at %s (type=%s)",
> +                     ctx->mutex_path, apr_proc_mutex_defname());

And how do you know that apr_proc_mutex_defname is either APR_LOCK_FCNTL
or APR_LOCK_FLOCK? Maybe the default mutex on this platform is something
different.

> 
> Added: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c?rev=721952&view=auto
> ==============================================================================
> --- httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c (added)
> +++ httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c Sun Nov 30 18:55:14 2008
> @@ -0,0 +1,551 @@
> +/* Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include "httpd.h"
> +#include "http_config.h"
> +#include "http_log.h"
> +#include "apr_strings.h"
> +#include "apr_hash.h"
> +#include "ap_mpm.h"
> +#include "scoreboard.h"
> +
> +module AP_MODULE_DECLARE_DATA heartmonitor_module;
> +
> +typedef struct hm_server_t
> +{
> +    const char *ip;
> +    int busy;
> +    int ready;
> +    apr_time_t seen;
> +} hm_server_t;
> +
> +typedef struct hm_ctx_t
> +{
> +    int active;
> +    const char *storage_path;
> +    apr_proc_mutex_t *mutex;
> +    const char *mutex_path;
> +    apr_sockaddr_t *mcast_addr;
> +    int status;
> +    int keep_running;

Shouldn't this be volatile?

> +    apr_thread_mutex_t *start_mtx;
> +    apr_thread_t *thread;
> +    apr_socket_t *sock;
> +    apr_pool_t *p;
> +    apr_hash_t *servers;
> +} hm_ctx_t;
> +

> +
> +static void *hm_worker(apr_thread_t *thd, void *data)
> +{
> +    hm_ctx_t *ctx = (hm_ctx_t *) data;
> +    apr_status_t rv;
> +
> +    ctx->p = apr_thread_pool_get(thd);
> +    ctx->status = 0;
> +    ctx->keep_running = 1;
> +    apr_thread_mutex_unlock(ctx->start_mtx);
> +
> +    while (ctx->keep_running) {
> +        rv = apr_proc_mutex_trylock(ctx->mutex);
> +        if (rv == APR_SUCCESS) {
> +            break;
> +        }
> +        apr_sleep(apr_time_from_msec(200));
> +    }
> +
> +    rv = hm_listen(ctx);
> +
> +    if (rv) {
> +        ctx->status = rv;
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
> +                     "Heartmonitor: Unable to listen for connections!");
> +        apr_proc_mutex_unlock(ctx->mutex);
> +        apr_thread_exit(ctx->thread, rv);
> +        return NULL;
> +    }
> +
> +
> +    apr_time_t last = apr_time_now();
> +    while (ctx->keep_running) {
> +        int n;
> +        apr_pool_t *p;
> +        apr_pollfd_t pfd;
> +        apr_interval_time_t timeout;
> +        apr_pool_create(&p, ctx->p);
> +
> +        apr_time_t now = apr_time_now();
> +
> +        if (apr_time_sec((now - last)) > 5) {

Hardcoded 5 seconds? Bah!!

> +            hm_update_stats(ctx, p);
> +            apr_pool_clear(p);
> +            last = now;
> +        }
> +
> +        pfd.desc_type = APR_POLL_SOCKET;
> +        pfd.desc.s = ctx->sock;
> +        pfd.p = p;
> +        pfd.reqevents = APR_POLLIN;
> +
> +        timeout = apr_time_from_sec(1);
> +
> +        rv = apr_poll(&pfd, 1, &n, timeout);
> +
> +        if (!ctx->keep_running) {
> +            break;
> +        }
> +
> +        if (rv) {
> +            apr_pool_destroy(p);
> +            continue;
> +        }
> +
> +        if (pfd.rtnevents & APR_POLLIN) {
> +            hm_recv(ctx, p);
> +        }
> +
> +        apr_pool_destroy(p);

Why not just clearing the pool?

> +    }
> +
> +    apr_proc_mutex_unlock(ctx->mutex);
> +    apr_thread_exit(ctx->thread, APR_SUCCESS);
> +
> +    return NULL;
> +}
> +
> +static apr_status_t hm_pool_cleanup(void *baton)
> +{
> +    apr_status_t rv;
> +    hm_ctx_t *ctx = (hm_ctx_t *) baton;
> +
> +    ctx->keep_running = 0;
> +
> +    apr_thread_join(&rv, ctx->thread);
> +
> +    return rv;
> +}
> +
> +static void start_hm_worker(apr_pool_t *p, hm_ctx_t *ctx)
> +{
> +    apr_status_t rv;
> +
> +    rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
> +                                 p);
> +
> +    if (rv) {
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
> +                     "Heartmonitor: apr_thread_cond_create failed");

You create a thread mutex above, not a thread cond.

> +        ctx->status = rv;
> +        return;
> +    }
> +
> +    apr_thread_mutex_lock(ctx->start_mtx);
> +
> +    apr_pool_cleanup_register(p, ctx, hm_pool_cleanup, apr_pool_cleanup_null);
> +
> +    rv = apr_thread_create(&ctx->thread, NULL, hm_worker, ctx, p);
> +    if (rv) {
> +        apr_pool_cleanup_kill(p, ctx, hm_pool_cleanup);
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
> +                     "Heartmonitor: apr_thread_create failed");
> +        ctx->status = rv;
> +    }
> +
> +    apr_thread_mutex_lock(ctx->start_mtx);
> +    apr_thread_mutex_unlock(ctx->start_mtx);

This may deserve some comment. As far as I understand the desire is to wait until the
hb_worker thread is up.
But to be honest I do not understand the need for the start_mutex at all.


> +    apr_thread_mutex_destroy(ctx->start_mtx);
> +}
> +
> +static void hm_child_init(apr_pool_t *p, server_rec *s)
> +{
> +    hm_ctx_t *ctx =
> +        ap_get_module_config(s->module_config, &heartmonitor_module);
> +
> +    apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p);
> +
> +    ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s,
> +                 "Heartmonitor: Starting Listener Thread. mcast=%pI",
> +                 ctx->mcast_addr);
> +
> +    ctx->status = -1;
> +
> +    start_hm_worker(p, ctx);
> +
> +    if (ctx->status != 0) {
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s,
> +                     "Heartmonitor: Failed to start listener thread.");
> +        return;
> +    }
> +
> +    return;
> +}
> +
> +static int hm_post_config(apr_pool_t *p, apr_pool_t *plog,
> +                          apr_pool_t *ptemp, server_rec *s)
> +{
> +    hm_ctx_t *ctx = ap_get_module_config(s->module_config,
> +                                         &heartmonitor_module);
> +
> +    apr_status_t rv = apr_proc_mutex_create(&ctx->mutex,
> +                                            ctx->mutex_path,
> +#if APR_HAS_FCNTL_SERIALIZE
> +
> +                                            APR_LOCK_FCNTL,
> +#else
> +#if APR_HAS_FLOCK_SERIALIZE
> +                                            APR_LOCK_FLOCK,
> +#else
> +#error port me to a non crap platform.
> +#endif
> +#endif
> +                                            p);

Is there any reason why we must use either APR_LOCK_FCNTL or APR_LOCK_FLOCK,
wouldn't the default mutex work?

> +
> +    if (rv) {
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
> +                     "Heartmonitor: Failed to create listener "
> +                     "mutex at %s (type=%s)", ctx->mutex_path,
> +                     apr_proc_mutex_defname());

And how do you know that apr_proc_mutex_defname is either APR_LOCK_FCNTL
or APR_LOCK_FLOCK? Maybe the default mutex on this platform is something
different.

> +        return !OK;
> +    }
> +
> +    return OK;
> +}
> +
> +static void hm_register_hooks(apr_pool_t *p)
> +{
> +    ap_hook_post_config(hm_post_config, NULL, NULL, APR_HOOK_MIDDLE);
> +    ap_hook_child_init(hm_child_init, NULL, NULL, APR_HOOK_MIDDLE);
> +}
> +
> +static void *hm_create_config(apr_pool_t *p, server_rec *s)
> +{
> +    hm_ctx_t *ctx = (hm_ctx_t *) apr_palloc(p, sizeof(hm_ctx_t));
> +
> +    ctx->active = 0;
> +    ctx->storage_path = ap_server_root_relative(p, "logs/hb.dat");

Why doesn't ctx->mutex_path get initialized here?

> +
> +    return ctx;
> +}
> +


Regards

RĂ¼diger


Re: svn commit: r721952 - in /httpd/httpd/trunk: ./ modules/ modules/cluster/

Posted by Paul Querna <ch...@force-elite.com>.
Takashi Sato wrote:
> On Mon, 01 Dec 2008 02:55:15 -0000
> pquerna@apache.org wrote:
> 
>> Author: pquerna
>> Date: Sun Nov 30 18:55:14 2008
>> New Revision: 721952
....
>> +
>> +static void *hb_worker(apr_thread_t *thd, void *data)
> 
> Don't this need to be APR_THREAD_FUNC?
> 

Fixed in r724090.
>> +{
>> +    hb_ctx_t *ctx = (hb_ctx_t *) data;
>> +    apr_status_t rv;
>> +
>> +    apr_pool_t *pool = apr_thread_pool_get(thd);
>> +    apr_pool_tag(pool, "heartbeat_worker");
>> +    ctx->status = 0;
> 
> The meaning of "status zero" is unclear.

Fixed all of the ctx->status things to use apr_status_t values in r724091.

> [cut]
>> +static const char *cmd_hb_address(cmd_parms *cmd,
>> +                                  void *dconf, const char *addr)
>> +{
>> +    apr_status_t rv;
>> +    char *host_str;
>> +    char *scope_id;
>> +    apr_port_t port = 0;
>> +    apr_pool_t *p = cmd->pool;
>> +    hb_ctx_t *ctx =
>> +        (hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
>> +                                          &heartbeat_module);
>> +    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
>> +
>> +    if (err != NULL) {
>> +        return err;
>> +    }
>> +
>> +    ctx->active = 1;
>> +
>> +    rv = apr_parse_addr_port(&host_str, &scope_id, &port, addr, p);
> 
> cmd->temp_pool is better than cmd->pool.


Fixed up the config code to use the temp pool in r724092.

Thanks,

Paul



Re: svn commit: r721952 - in /httpd/httpd/trunk: ./ modules/ modules/cluster/

Posted by Takashi Sato <ta...@lans-tv.com>.
On Mon, 01 Dec 2008 02:55:15 -0000
pquerna@apache.org wrote:

> Author: pquerna
> Date: Sun Nov 30 18:55:14 2008
> New Revision: 721952
> 
> URL: http://svn.apache.org/viewvc?rev=721952&view=rev
> Log:
> Add two new modules, originally written at Joost, to handle load balancing across
> multiple apache servers within the same datacenter.
> 
> mod_heartbeat generates multicast status messages with the current number of 
> clients connected, but the formated can easily be extended to include other things.
> 
> mod_heartmonitor collects these messages into a static file, which then can be 
> used for other modules to make load balancing decisions on.
> 
> Added:
>     httpd/httpd/trunk/modules/cluster/   (with props)
>     httpd/httpd/trunk/modules/cluster/Makefile.in   (with props)
>     httpd/httpd/trunk/modules/cluster/README.heartbeat
>     httpd/httpd/trunk/modules/cluster/README.heartmonitor
>     httpd/httpd/trunk/modules/cluster/config.m4
>     httpd/httpd/trunk/modules/cluster/mod_heartbeat.c   (with props)
>     httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c   (with props)
> Modified:
>     httpd/httpd/trunk/CHANGES
>     httpd/httpd/trunk/modules/README
> 
[cut]
> Added: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartbeat.c?rev=721952&view=auto
> ==============================================================================
> --- httpd/httpd/trunk/modules/cluster/mod_heartbeat.c (added)
> +++ httpd/httpd/trunk/modules/cluster/mod_heartbeat.c Sun Nov 30 18:55:14 2008
> @@ -0,0 +1,354 @@
> +/* Licensed to the Apache Software Foundation (ASF) under one or more
[cut]
> +typedef struct hb_ctx_t
> +{
> +    int active;
> +    apr_sockaddr_t *mcast_addr;
> +    int server_limit;
> +    int thread_limit;
> +    int status;

[cut]
> +
> +static void *hb_worker(apr_thread_t *thd, void *data)

Don't this need to be APR_THREAD_FUNC?

> +{
> +    hb_ctx_t *ctx = (hb_ctx_t *) data;
> +    apr_status_t rv;
> +
> +    apr_pool_t *pool = apr_thread_pool_get(thd);
> +    apr_pool_tag(pool, "heartbeat_worker");
> +    ctx->status = 0;

The meaning of "status zero" is unclear.

[cut]
> +static void start_hb_worker(apr_pool_t *p, hb_ctx_t *ctx)
> +{
> +    apr_status_t rv;
> +
> +    rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
> +                                 p);
> +
> +    if (rv) {
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
> +                     "Heartbeat: apr_thread_cond_create failed");
> +        ctx->status = rv;
> +        return;
> +    }

status is apr_status_t?

> +
> +    apr_thread_mutex_lock(ctx->start_mtx);
> +
> +    apr_pool_cleanup_register(p, ctx, hb_pool_cleanup, apr_pool_cleanup_null);
> +
> +    rv = apr_thread_create(&ctx->thread, NULL, hb_worker, ctx, p);
> +    if (rv) {
> +        apr_pool_cleanup_kill(p, ctx, hb_pool_cleanup);
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
> +                     "Heartbeat: apr_thread_create failed");
> +        ctx->status = rv;
> +    }

Same above.

> +
> +    apr_thread_mutex_lock(ctx->start_mtx);
> +    apr_thread_mutex_unlock(ctx->start_mtx);
> +    apr_thread_mutex_destroy(ctx->start_mtx);
> +}
> +
> +static void hb_child_init(apr_pool_t *p, server_rec *s)
> +{
> +    hb_ctx_t *ctx = ap_get_module_config(s->module_config, &heartbeat_module);
> +
> +    apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p);
> +
> +    ctx->status = -1;

I don't like this. "status -1" is unclear.

> +
> +    if (ctx->active) {
> +        start_hb_worker(p, ctx);
> +        if (ctx->status != 0) {

Same above.
Why not change the type of hb_ctx_t::status to apr_status_t ?

[cut]
> +static const char *cmd_hb_address(cmd_parms *cmd,
> +                                  void *dconf, const char *addr)
> +{
> +    apr_status_t rv;
> +    char *host_str;
> +    char *scope_id;
> +    apr_port_t port = 0;
> +    apr_pool_t *p = cmd->pool;
> +    hb_ctx_t *ctx =
> +        (hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
> +                                          &heartbeat_module);
> +    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
> +
> +    if (err != NULL) {
> +        return err;
> +    }
> +
> +    ctx->active = 1;
> +
> +    rv = apr_parse_addr_port(&host_str, &scope_id, &port, addr, p);

cmd->temp_pool is better than cmd->pool.

> +
> +    if (rv) {
> +        return "HeartbeatAddress: Unable to parse address.";
> +    }
> +
> +    if (host_str == NULL) {
> +        return "HeartbeatAddress: No host provided in address";
> +    }
> +
> +    if (port == 0) {
> +        return "HeartbeatAddress: No port provided in address";
> +    }
> +
> +    rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
> +                               p);
> +
> +    if (rv) {
> +        return "HeartbeatAddress: apr_sockaddr_info_get failed.";
> +    }
> +
> +    const char *tmpdir = NULL;
> +    rv = apr_temp_dir_get(&tmpdir, p);

Same above.

> +    if (rv) {
> +        return "HeartbeatAddress: unable to find temp directory.";
> +    }
> +
> +    char *path = apr_pstrcat(p, tmpdir, "/hb-tmp.XXXXXX", NULL);

Same above.

[cut]
> Added: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c?rev=721952&view=auto
> ==============================================================================
> --- httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c (added)
> +++ httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c Sun Nov 30 18:55:14 2008
> @@ -0,0 +1,551 @@
> +/* Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with

[cut]

> +typedef struct hm_ctx_t
> +{
> +    int active;
> +    const char *storage_path;
> +    apr_proc_mutex_t *mutex;
> +    const char *mutex_path;
> +    apr_sockaddr_t *mcast_addr;
> +    int status;

Similar to mod_heartbeat.c

> +    int keep_running;
> +    apr_thread_mutex_t *start_mtx;
> +    apr_thread_t *thread;
> +    apr_socket_t *sock;
> +    apr_pool_t *p;
> +    apr_hash_t *servers;
> +} hm_ctx_t;
> +
[cut]
> +static void *hm_worker(apr_thread_t *thd, void *data)

Don't this need to be APR_THREAD_FUNC?

> +{
> +    hm_ctx_t *ctx = (hm_ctx_t *) data;
> +    apr_status_t rv;
> +
> +    ctx->p = apr_thread_pool_get(thd);
> +    ctx->status = 0;
> +    ctx->keep_running = 1;
> +    apr_thread_mutex_unlock(ctx->start_mtx);
> +
> +    while (ctx->keep_running) {
> +        rv = apr_proc_mutex_trylock(ctx->mutex);
> +        if (rv == APR_SUCCESS) {
> +            break;
> +        }
> +        apr_sleep(apr_time_from_msec(200));
> +    }
> +
> +    rv = hm_listen(ctx);
> +
> +    if (rv) {
> +        ctx->status = rv;
> +        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
> +                     "Heartmonitor: Unable to listen for connections!");
> +        apr_proc_mutex_unlock(ctx->mutex);
> +        apr_thread_exit(ctx->thread, rv);
> +        return NULL;
> +    }
> +
> +
> +    apr_time_t last = apr_time_now();
> +    while (ctx->keep_running) {
> +        int n;
> +        apr_pool_t *p;
> +        apr_pollfd_t pfd;
> +        apr_interval_time_t timeout;
> +        apr_pool_create(&p, ctx->p);
> +
> +        apr_time_t now = apr_time_now();
> +
> +        if (apr_time_sec((now - last)) > 5) {
> +            hm_update_stats(ctx, p);
> +            apr_pool_clear(p);
> +            last = now;
> +        }
> +
> +        pfd.desc_type = APR_POLL_SOCKET;
> +        pfd.desc.s = ctx->sock;
> +        pfd.p = p;
> +        pfd.reqevents = APR_POLLIN;
> +
> +        timeout = apr_time_from_sec(1);
> +
> +        rv = apr_poll(&pfd, 1, &n, timeout);
> +
> +        if (!ctx->keep_running) {
> +            break;
> +        }

The parent of p is ctx->p, which is child's process pool,
so this is not a memory leak. 
But to be clear IMHO it should call apr_pool_destroy.
Is it extra care?

[cut]
> +static const char *cmd_hm_storage(cmd_parms *cmd,
> +                                  void *dconf, const char *path)
> +{
> +    apr_pool_t *p = cmd->pool;
> +    hm_ctx_t *ctx =
> +        (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
> +                                          &heartmonitor_module);
> +    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
> +
> +    if (err != NULL) {
> +        return err;
> +    }
> +
> +    ctx->storage_path = ap_server_root_relative(p, path);
> +    ctx->mutex_path =
> +        ap_server_root_relative(p, apr_pstrcat(p, path, ".hm-lock", NULL));

cmd->temp_pool is better than "p" for apr_pstrcat.

> +
> +    return NULL;
> +}
> +
> +static const char *cmd_hm_listen(cmd_parms *cmd,
> +                                 void *dconf, const char *mcast_addr)
> +{
> +    apr_status_t rv;
> +    char *host_str;
> +    char *scope_id;
> +    apr_port_t port = 0;
> +    apr_pool_t *p = cmd->pool;
> +    hm_ctx_t *ctx =
> +        (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
> +                                          &heartmonitor_module);
> +    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
> +
> +    if (err != NULL) {
> +        return err;
> +    }
> +
> +    ctx->active = 1;
> +
> +    rv = apr_parse_addr_port(&host_str, &scope_id, &port, mcast_addr, p);

cmd->temp_pool is better than "p".


Regards,
Takashi
-- 
Takashi Sato <ta...@lans-tv.com>