You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/07/20 02:01:54 UTC
cvs commit: incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster ClusterHBReceiver.java
gdamour 2004/07/19 17:01:54
Modified: sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster
ClusterHBReceiver.java
Log:
o queue the operations (add/remove node) to be performed on a cluster in order to have accurate timestamps
associated to node heartbeats;
o prevent the underlying thread of ClockDaemon to fail in case of a reconfiguration problem (such as a node
failing right in the middle of a topology reconfiguration).
Revision Changes Path
1.2 +49 -7 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterHBReceiver.java
Index: ClusterHBReceiver.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterHBReceiver.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ClusterHBReceiver.java 17 Jul 2004 03:44:18 -0000 1.1
+++ ClusterHBReceiver.java 20 Jul 2004 00:01:54 -0000 1.2
@@ -40,13 +40,14 @@
import org.apache.geronimo.pool.ClockPool;
import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
- * Heartbeats listeners.
+ * Heartbeats listener.
* <BR>
* It joins the multicast group associated to the bound cluster and monitors
* node heartbeats. When an heartbeat is received for the very first time, it
- * adds it the underlying cluster. Conversely, when a configurable number of
+ * adds it to the underlying cluster. Conversely, when a configurable number of
* heartbeats have been missed, it removes it from the underlying cluster.
* <BR>
* This service must be executed by a single node of the cluster. If the node
@@ -84,6 +85,11 @@
*/
private final Map trackers;
+ /**
+ * Queue the cluster operations (add or remove node).
+ */
+ private QueuedExecutor queuedExecutor;
+
private MulticastSocket socket;
/**
@@ -122,6 +128,7 @@
socket.joinGroup(info.getAddress());
running = true;
new Thread(new HearbeatListener()).start();
+ queuedExecutor = new QueuedExecutor();
}
public void doStop() throws WaitingException, Exception {
@@ -130,6 +137,7 @@
stopTrackers();
socket.leaveGroup(info.getAddress());
socket.close();
+ queuedExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
}
public void doFail() {
@@ -142,6 +150,7 @@
log.error("Can not leave group", e);
}
socket.close();
+ queuedExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
}
/**
@@ -181,20 +190,27 @@
log.error(e);
break;
}
+ long timestamp = System.currentTimeMillis();
ByteArrayInputStream memIn =
new ByteArrayInputStream(buf, 0, packet.getLength());
ObjectInputStream in = new ObjectInputStream(memIn);
NodeInfo nodeInfo = (NodeInfo) in.readObject();
- long tempo = in.readLong();
HeartbeatTracker tracker;
synchronized(trackers) {
tracker = (HeartbeatTracker) trackers.get(nodeInfo);
if ( null == tracker ) {
+ long tempo = in.readLong();
tracker = new HeartbeatTracker(nodeInfo, tempo);
- tracker.start();
+ trackers.put(nodeInfo, tracker);
+ // Does not start the tracker in this thread
+ // as one wants this loop to reflect "correct"
+ // timestamps. When a tracker is started, it
+ // adds its associated node to the cluster, which
+ // can takes some time.
+ queuedExecutor.execute(new StartTracker(tracker));
}
}
- tracker.lastTimestamp = System.currentTimeMillis();
+ tracker.lastTimestamp = timestamp;
} catch (Exception e) {
log.error("Error while listening heartbeat", e);
}
@@ -203,6 +219,26 @@
}
/**
+ * Starts an heartbeat tracker.
+ */
+ private class StartTracker implements Runnable {
+ private final HeartbeatTracker tracker;
+ private StartTracker(HeartbeatTracker aTracker) {
+ tracker = aTracker;
+ }
+ public void run() {
+ try {
+ tracker.start();
+ } catch (NodeException e) {
+ synchronized(trackers) {
+ trackers.remove(tracker.node);
+ }
+ log.error("Can not start tracker", e);
+ }
+ }
+ }
+
+ /**
* Tracks the heartbeat of a given node.
*/
private class HeartbeatTracker implements Runnable {
@@ -237,6 +273,13 @@
stop();
} catch (NodeException e) {
log.error(e);
+ } catch (Throwable e) {
+ // Ensures that the underlying thread of ClockDaemon
+ // is not interrupted.
+ // TODO as a matter of fact, this happen if a node
+ // fails during a reconfiguration. There is a bug in
+ // the way exceptions are unwrapped via GBeans.
+ log.error(e);
}
}
}
@@ -244,7 +287,6 @@
cluster.addMember(node);
ticket = clockPool.getClockDaemon().
executePeriodically(delay, this, false);
- trackers.put(node, this);
}
public void stop() throws NodeException {
synchronized(trackers) {