You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/07/20 02:01:54 UTC

cvs commit: incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster ClusterHBReceiver.java

gdamour     2004/07/19 17:01:54

  Modified:    sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster
                        ClusterHBReceiver.java
  Log:
  o queue the operations (add/remove node) to be performed on a cluster in order to have accurate timestamps
  associated to node heartbeats;
  o prevent the underlying thread of ClockDaemon to fail in case of a reconfiguration problem (such as a node
  failing right in the middle of a topology reconfiguration).
  
  Revision  Changes    Path
  1.2       +49 -7     incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterHBReceiver.java
  
  Index: ClusterHBReceiver.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterHBReceiver.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ClusterHBReceiver.java	17 Jul 2004 03:44:18 -0000	1.1
  +++ ClusterHBReceiver.java	20 Jul 2004 00:01:54 -0000	1.2
  @@ -40,13 +40,14 @@
   import org.apache.geronimo.pool.ClockPool;
   
   import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
  +import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
   
   /**
  - * Heartbeats listeners.
  + * Heartbeats listener.
    * <BR>
    * It joins the multicast group associated to the bound cluster and monitors
    * node heartbeats. When an heartbeat is received for the very first time, it
  - * adds it the underlying cluster. Conversely, when a configurable number of
  + * adds it to the underlying cluster. Conversely, when a configurable number of
    * heartbeats have been missed, it removes it from the underlying cluster.
    * <BR>
    * This service must be executed by a single node of the cluster. If the node
  @@ -84,6 +85,11 @@
        */
       private final Map trackers;
   
  +    /**
  +     * Queue the cluster operations (add or remove node).
  +     */
  +    private QueuedExecutor queuedExecutor;
  +    
       private MulticastSocket socket;
   
       /**
  @@ -122,6 +128,7 @@
           socket.joinGroup(info.getAddress());
           running = true;
           new Thread(new HearbeatListener()).start();
  +        queuedExecutor = new QueuedExecutor();
       }
   
       public void doStop() throws WaitingException, Exception {
  @@ -130,6 +137,7 @@
           stopTrackers();
           socket.leaveGroup(info.getAddress());
           socket.close();
  +        queuedExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
       }
   
       public void doFail() {
  @@ -142,6 +150,7 @@
               log.error("Can not leave group", e);
           }
           socket.close();
  +        queuedExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
       }
   
       /**
  @@ -181,20 +190,27 @@
                           log.error(e);
                           break;
                       }
  +                    long timestamp = System.currentTimeMillis();
                       ByteArrayInputStream memIn =
                           new ByteArrayInputStream(buf, 0, packet.getLength());
                       ObjectInputStream in = new ObjectInputStream(memIn);
                       NodeInfo nodeInfo = (NodeInfo) in.readObject();
  -                    long tempo = in.readLong();
                       HeartbeatTracker tracker;
                       synchronized(trackers) {
                           tracker = (HeartbeatTracker) trackers.get(nodeInfo);
                           if ( null == tracker ) {
  +                            long tempo = in.readLong();
                               tracker = new HeartbeatTracker(nodeInfo, tempo);
  -                            tracker.start();
  +                            trackers.put(nodeInfo, tracker);
  +                            // Does not start the tracker in this thread
  +                            // as one wants this loop to reflect "correct"
  +                            // timestamps. When a tracker is started, it
  +                            // adds its associated node to the cluster, which
  +                            // can takes some time.
  +                            queuedExecutor.execute(new StartTracker(tracker));
                           }
                       }
  -                    tracker.lastTimestamp = System.currentTimeMillis();
  +                    tracker.lastTimestamp = timestamp;
                   } catch (Exception e) {
                       log.error("Error while listening heartbeat", e);
                   }
  @@ -203,6 +219,26 @@
       }
       
       /**
  +     * Starts an heartbeat tracker.
  +     */
  +    private class StartTracker implements Runnable {
  +        private final HeartbeatTracker tracker;
  +        private StartTracker(HeartbeatTracker aTracker) {
  +            tracker = aTracker;
  +        }
  +        public void run() {
  +            try {
  +                tracker.start();
  +            } catch (NodeException e) {
  +                synchronized(trackers) {
  +                    trackers.remove(tracker.node);
  +                }
  +                log.error("Can not start tracker", e);
  +            }
  +        }
  +    }
  +    
  +    /**
        * Tracks the heartbeat of a given node.
        */
       private class HeartbeatTracker implements Runnable {
  @@ -237,6 +273,13 @@
                       stop();
                   } catch (NodeException e) {
                       log.error(e);
  +                } catch (Throwable e) {
  +                    // Ensures that the underlying thread of ClockDaemon
  +                    // is not interrupted.
  +                    // TODO as a matter of fact, this happen if a node
  +                    // fails during a reconfiguration. There is a bug in
  +                    // the way exceptions are unwrapped via GBeans. 
  +                    log.error(e);
                   }
               }
           }
  @@ -244,7 +287,6 @@
               cluster.addMember(node);
               ticket = clockPool.getClockDaemon().
                   executePeriodically(delay, this, false);
  -            trackers.put(node, this);
           }
           public void stop() throws NodeException {
               synchronized(trackers) {