You are viewing a plain text version of this content. The canonical link for it is here.
Posted to awf-commits@incubator.apache.org by jm...@apache.org on 2012/02/13 23:11:19 UTC

svn commit: r1243732 - in /incubator/deft/trunk/awf-example-gossip: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/awf/ src/main/java/org/apache/awf/example/ src/main/java/org/apache/awf/example/g...

Author: jmeehan
Date: Mon Feb 13 23:11:18 2012
New Revision: 1243732

URL: http://svn.apache.org/viewvc?rev=1243732&view=rev
Log:
DEFT-198 - New Project: Apache AWF Gossip Server Example

Added:
    incubator/deft/trunk/awf-example-gossip/
    incubator/deft/trunk/awf-example-gossip/README
    incubator/deft/trunk/awf-example-gossip/pom.xml
    incubator/deft/trunk/awf-example-gossip/src/
    incubator/deft/trunk/awf-example-gossip/src/main/
    incubator/deft/trunk/awf-example-gossip/src/main/java/
    incubator/deft/trunk/awf-example-gossip/src/main/java/org/
    incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/
    incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/
    incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/
    incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/
    incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/FailureDetector.java
    incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/Gossiper.java
    incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/MessagingService.java
    incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/NodeStateChangeListener.java
    incubator/deft/trunk/awf-example-gossip/src/main/resources/
    incubator/deft/trunk/awf-example-gossip/src/test/
    incubator/deft/trunk/awf-example-gossip/src/test/java/
    incubator/deft/trunk/awf-example-gossip/src/test/resources/

Added: incubator/deft/trunk/awf-example-gossip/README
URL: http://svn.apache.org/viewvc/incubator/deft/trunk/awf-example-gossip/README?rev=1243732&view=auto
==============================================================================
--- incubator/deft/trunk/awf-example-gossip/README (added)
+++ incubator/deft/trunk/awf-example-gossip/README Mon Feb 13 23:11:18 2012
@@ -0,0 +1,27 @@
+A "demo framework" for building a decentralized gossip based distributed system with accrual failure 
+detection. It's also an implementation of: The Phi Accrual Failure Detector[1] and Amazon's paper on Gossip[2].
+
+[1] http://ddg.jaist.ac.jp/pub/HDY+04.pdf
+[2] http://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf
+
+External Dependencies needed for this example, not included in the AWF distribution:
+json-simple ("A simple Java toolkit for JSON") http://code.google.com/p/json-simple/ (Apache License 2.0)
+
+Usage:
+
+There are two configuration points (per node), 'seed' and 'address' you need to change before you are ready to start 
+your cluster.
+They are located in the MessagingService class. The seed should be a stable node that new nodes will use for bootstrapping 
+into the existing cluster. The address is you local ip and port that other nodes in the cluster will connect to.
+Your seed will have seed == address (that is how a seed is defined). 
+
+Example 1.
+A two node cluster, running on the same machine
+Node A (seed node). seed = 127.0.0.1:14922, address = 127.0.0.1:14922 
+Node B. seed = 127.0.0.1:14922, address = 127.0.0.1:14923
+
+Example 2
+A three node cluster, running on three different machines
+Node A (seed node). seed = 192.168.0.1:14922, address = 192.168.0.1:14922 
+Node B. seed = 192.168.0.1:14922, address = 192.168.0.2:14923
+Node C. seed = 192.168.0.1:14922, address = 192.168.0.3:14924

Added: incubator/deft/trunk/awf-example-gossip/pom.xml
URL: http://svn.apache.org/viewvc/incubator/deft/trunk/awf-example-gossip/pom.xml?rev=1243732&view=auto
==============================================================================
--- incubator/deft/trunk/awf-example-gossip/pom.xml (added)
+++ incubator/deft/trunk/awf-example-gossip/pom.xml Mon Feb 13 23:11:18 2012
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+	<!--
+	     		Licensed to the Apache Software Foundation (ASF) under one or more
+		contributor license agreements. See the NOTICE file distributed with
+		this work for additional information regarding copyright ownership.
+		The ASF licenses this file to You under the Apache License, Version
+		2.0 (the "License"); you may not use this file except in compliance
+		with the License. You may obtain a copy of the License at
+
+		http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+		applicable law or agreed to in writing, software distributed under the
+		License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+		CONDITIONS OF ANY KIND, either express or implied. See the License for
+		the specific language governing permissions and limitations under the
+		License.
+	-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.awf</groupId>
+		<artifactId>awf-parent</artifactId>
+		<version>0.4.0-SNAPSHOT</version>
+		<relativePath>../awf-parent</relativePath>
+	</parent>
+	<artifactId>awf-example-gossip</artifactId>
+
+	<name>Apache AWF Gossip Server Example</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.awf</groupId>
+			<artifactId>awf-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
+

Added: incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/FailureDetector.java?rev=1243732&view=auto
==============================================================================
--- incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/FailureDetector.java (added)
+++ incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/FailureDetector.java Mon Feb 13 23:11:18 2012
@@ -0,0 +1,111 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.awf.example.gossip;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+/**
+ *  Java implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al.
+ * 
+ *  Failure detection is the process of determining which nodes in a distributed fault-tolerant system have failed.
+ *  Original Phi Accrual Failure Detection paper: http://ddg.jaist.ac.jp/pub/HDY+04.pdf
+ *
+ *  A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event of a real crash.
+ *  Conversely, a high threshold generates fewer mistakes but needs more time to detect actual crashes""" 
+ *
+ *  (Original (python) version by Brandon Williams (github.com/driftx), modified by Roger Schildmeijer 
+ *  (github.com/rchildmeijer))
+ */
+
+public class FailureDetector {
+	
+	private static final int MAX_SAMPLE_SIZE = 1000;
+	
+	/**
+	 * 1 = 10% error rate, 2 = 1%, 3 = 0.1%.., (eg threshold=3. no heartbeat for >6s => node marked as dead
+	 */
+	private static final int THRESHOLD = 3; 
+	
+	private final Map<String, List<Long>> intervals = Maps.newHashMap();
+	private final Map<String, Map<String, Long>> hosts = Maps.newHashMap();
+	private final Map<String, Long> timestamps = Maps.newHashMap();
+
+	/**
+	 * Called when host has indicated being alive (aka heartbeat) 
+	 * @param host format = "ip:port", eg. "192.168.0.1:14922"
+	 */
+	public void heartbeat(String host) {
+		if (!timestamps.containsKey(host)) {
+			timestamps.put(host, System.currentTimeMillis());
+			intervals.put(host, new LinkedList<Long>());
+			hosts.put(host, new HashMap<String, Long>());
+		} else {
+			long now = System.currentTimeMillis();
+			long interval = now - timestamps.get(host);
+			timestamps.put(host, now);
+			intervals.get(host).add(interval);
+			if (intervals.get(host).size() > MAX_SAMPLE_SIZE) {
+				int size = intervals.get(host).size(); 
+				intervals.get(host).remove(size - 1);
+			}
+			if (intervals.get(host).size() > 1) {
+				long numerator = sum(intervals.get(host));
+				double denominator = intervals.get(host).size();
+				hosts.get(host).put("mean", (long) (numerator / denominator));
+			}
+			
+		}
+	}
+	
+	private double probability(String host, long timestamp) {
+		double exponent = -1.0 * timestamp / hosts.get(host).get("mean");
+		return 1 - (1.0 - Math.pow(Math.E, exponent));
+	}
+	
+	public double phi(String host) {
+		Map<String, Long> means = hosts.get(host);
+		if (/*means == null || */means.isEmpty()) {
+			return 0.0;
+		}
+		long timestamp = System.currentTimeMillis();
+		long diff = timestamp - timestamps.get(host);
+		double prob = probability(host, diff);
+		if (Double.isNaN(prob)) {	// is isNaN correct? is isZero would be more correct
+			return Math.pow(1, -128);
+		}
+		return -1 * Math.log10(prob);
+	}
+	
+	public boolean isAlive(String host) { return phi(host) < THRESHOLD; }
+	
+	public boolean isDead(String host) { return !isAlive(host); }
+	
+	private Long sum(List<Long> values) {
+		long sum = 0;
+		for (Long value : values) sum += value;
+		return sum;
+	}
+	
+}

Added: incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/Gossiper.java?rev=1243732&view=auto
==============================================================================
--- incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/Gossiper.java (added)
+++ incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/Gossiper.java Mon Feb 13 23:11:18 2012
@@ -0,0 +1,215 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.awf.example.gossip;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.awf.io.callback.PeriodicCallback;
+import org.apache.awf.web.AsyncCallback;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ *  This module is responsible for Gossiping cluster information. The abstraction
+ *  maintains the list of live and dead nodes. Periodically i.e. every 1 second this module
+ *  chooses a random node and initiates a round of Gossip with it. This module as and when it hears a gossip
+ *  updates the Failure Detector with the liveness information.
+ *
+ *  Amazon paper on Gossip at http://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf
+
+ *  Gossip timer task runs every second.
+ *  During each of these runs the node initiates gossip exchange according to following rules:
+ *    1, Gossip to random live node (if any)
+ *    2, Gossip to random unreachable node with certain probability depending on number of unreachable and live nodes
+ *    3, If the node gossiped to at (1) was not seed, or the number of live nodes is less than number of seeds,
+ *       gossip to random seed with certain probability depending on number of unreachable, seed and live nodes. 
+ *
+ */
+public class Gossiper {
+	
+	private final FailureDetector fd;
+	private final MessagingService ms;
+	private final String seed;		// seed address (e.g. 192.168.0.1:14922)
+	private final String address;	// local address (e.g. 192.168.0.2:14923)
+	private final Map<String, Map<String, Long>> nodeStates = Maps.newHashMap();
+	private final List<String> aliveNodes = Lists.newLinkedList();
+	private final List<String> deadNodes = Lists.newLinkedList();
+	private final List<NodeStateChangeListener> nodeStateChangeListeners = Lists.newLinkedList();
+	private final AtomicLong version = new AtomicLong(0);
+	private long generation = System.currentTimeMillis();
+	
+	public Gossiper(FailureDetector fd, MessagingService ms, String seed, String address) {
+		this.fd = fd;
+		this.ms = ms;
+		this.seed = seed;
+		this.address = address;
+		new PeriodicCallback(
+				new AsyncCallback() { public void onCallback() { inititateGossipExhange(); scrutinizeCluster(); }},
+				1000
+		).start();
+				 				
+	}
+	
+
+	private void inititateGossipExhange() {
+		version.incrementAndGet();
+		boolean gossipToSeed = false;
+		if (!aliveNodes.isEmpty()) {
+			gossipToSeed = sendGossip(aliveNodes);
+		}
+		
+		if (!deadNodes.isEmpty()) {
+			double probability = ((double) deadNodes.size()) / (aliveNodes.size() + 1);
+			if (new Random().nextDouble() < probability) {
+				sendGossip(deadNodes);
+			}
+		}
+		
+		if (!gossipToSeed || aliveNodes.size() < 1) {
+			// gossip to a seed for facilitating partition healing
+			if (seed.equals(address)) {
+				return;
+			}
+			if (aliveNodes.isEmpty()) {
+				sendGossip(Collections.singletonList(seed));
+			}
+			else {
+				double probability = 1.0 / (aliveNodes.size() + deadNodes.size());
+				if (new Random().nextDouble() <= probability) {
+					sendGossip(Collections.singletonList(seed));
+				}
+			}
+		}
+	}
+	
+	/** Returns true if the gossip was sent to a seed
+	 * The gossip will be sent to a randomly chosen node from the nodes list.
+	 */
+	private boolean sendGossip(List<String> nodes) {
+		List<String> copy = Lists.newLinkedList(nodes);
+		Collections.shuffle(copy);
+		String node = copy.get(0);
+		Map<String, Map<String, Long>> gossip = Maps.newHashMap(nodeStates);
+		// dont gossip about dead nodes
+		for (String host : deadNodes) {
+			gossip.remove(host);
+		}
+		gossip.put(address, localState());
+		ms.sendOneWay(node, gossip);
+		return node == seed;
+	}
+	
+	private Map<String, Long> localState() {
+		Map<String, Long> myState = Maps.newHashMap();
+		myState.put("generation", generation);
+		myState.put("version", version.get());
+		return myState;
+	}
+
+
+	private void scrutinizeCluster() {
+		List<String> dead = Lists.newLinkedList();
+		for (String host : aliveNodes) {
+			if (fd.isDead(host)) {
+				dead.add(host);
+			}
+		}
+		
+		if (!dead.isEmpty()) {
+			deadNodes.addAll(dead);
+			aliveNodes.removeAll(dead);
+			for (String	host: dead) {
+				notifyOnDead(host);
+			}
+		}
+	}
+
+	/**
+	 * Invoked by the MessagingService when we receive gossip from another node in the cluster 
+	 */
+	public void deliverNewGossip(Map<String, Map<String, Long>> gossip, String sender) {
+		//gossip will contain info about me
+		gossip.remove(address);
+		for (String host : gossip.keySet()) {
+			Map<String, Long> digest = gossip.get(host);
+			if (nodeStates.containsKey(host)) {
+				// has digest about host, maybe update
+				if (digest.get("generation") > nodeStates.get(host).get("generation")) {
+					// node has restarted
+					if (aliveNodes.contains(host)) {
+						// we haven't marked the restarted node as dead yet (fast restart maybe)
+						// mark as dead so maintenance like resetting connection pools can occur
+						aliveNodes.remove(host);
+						deadNodes.add(host);
+						notifyOnDead(host);
+
+					}
+					updateNodeState(host, digest);
+				} else if (digest.get("version") > nodeStates.get(host).get("version") && 
+						digest.get("generation").longValue() == nodeStates.get(host).get("generation").longValue()) {
+					updateNodeState(host, digest);
+				}
+			} else {
+				// had no previous info about host
+				nodeStates.put(host, digest);
+				fd.heartbeat(host);
+				aliveNodes.add(host);
+				notifyOnJoin(host);
+			}
+		}
+	}
+
+	private void updateNodeState(String host, Map<String, Long> digest) {
+		fd.heartbeat(host);
+		nodeStates.put(host, digest);
+		if (deadNodes.contains(host)) {
+			deadNodes.remove(host);
+			aliveNodes.add(host);
+			notifyOnAlive(host);
+		}
+	}
+
+	private void notifyOnJoin(String host) {
+		System.out.println("onJoin(" + host + ")");
+		for (NodeStateChangeListener listener : nodeStateChangeListeners) {
+			listener.onJoin(host);
+		}
+	}
+	
+	private void notifyOnAlive(String host) {
+		System.out.println("onAlive(" + host + ")");
+		for (NodeStateChangeListener listener : nodeStateChangeListeners) {
+			listener.onAlive	(host);
+		}
+	}
+
+	private void notifyOnDead(String host) {
+		System.out.println("onDead(" + host + ")");
+		for (NodeStateChangeListener listener : nodeStateChangeListeners) {
+			listener.onDead(host);
+		}
+	}
+	
+}

Added: incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/MessagingService.java?rev=1243732&view=auto
==============================================================================
--- incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/MessagingService.java (added)
+++ incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/MessagingService.java Mon Feb 13 23:11:18 2012
@@ -0,0 +1,172 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.awf.example.gossip;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import org.apache.awf.io.AsynchronousSocket;
+import org.apache.awf.io.IOLoop;
+import org.apache.awf.util.AcceptUtil;
+import org.apache.awf.web.AsyncCallback;
+import org.apache.awf.web.AsyncResult;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+import com.google.common.collect.Maps;
+
+public class MessagingService {
+	
+	private final String seed = "127.0.0.1:14922";
+	private final String address = "127.0.0.1:14922";
+	private final Map<String, AsynchronousSocket> sockets = Maps.newHashMap();
+	private final Gossiper gossiper;
+	
+	private final AsyncCallback deafAsyncCallback = new AsyncCallback() { public void onCallback() { /*nop*/}}; 
+	
+	public MessagingService() {
+		FailureDetector fd = new FailureDetector();
+		gossiper = new Gossiper(fd, this, seed, address);
+		bind();
+	}
+	
+	public void sendOneWay(String to, Map<String, Map<String, Long>> gossip) {
+		String data = JSONValue.toJSONString(gossip) + "\r\n";
+		//System.out.println("Sending data: " + data.trim() + " to: " + to);
+		if (to.equals(address)) {
+			// handle local delivery
+			System.out.println("Local delivery..");
+		}
+		if (sockets.containsKey(to)) {
+			sockets.get(to).write(data.getBytes() , deafAsyncCallback);
+		} else {
+			connectToNode(to, data);
+		}
+	}
+
+	private void bind() {
+		try {
+			final ServerSocketChannel server = ServerSocketChannel.open();
+			server.configureBlocking(false);
+			int port = Integer.valueOf(address.split(":")[1]);
+			InetSocketAddress endpoint = new InetSocketAddress(port);	// use the "any" (0.0.0.0) address
+			server.socket().bind(endpoint);
+			AcceptUtil.accept(server, new AsyncCallback() { public void onCallback() { onAccept(server);} });
+			System.out.println("Elastica node bootstrapped successfully");
+		} catch (IOException e) {
+			System.err.println("Error during bind");
+		}
+	}
+
+	private void onAccept(ServerSocketChannel server) {
+		try {
+			SocketChannel clientChannel = server.accept();
+			final AsynchronousSocket client = new AsynchronousSocket(clientChannel);
+			final String remoteHost = clientChannel.socket().getRemoteSocketAddress().toString();
+			sockets.put(remoteHost, client);
+			client.setCloseCallback(new AsyncCallback() { public void onCallback() { handleClose(remoteHost); }});
+			client.readUntil(
+					"\r\n".getBytes(),
+					new AsyncResult<byte[]>() {
+						public void onSuccess(byte[] result) { handleRead(result, remoteHost); }
+						public void onFailure(Throwable caught) { /* handleOnFailure(caught); */}
+					}
+			);
+		} catch (IOException ioe) {
+				System.err.println("Exception during client accept");
+		}
+	}
+
+	/**
+	 * (JSON) format for data:
+	 * {
+	 *   "127.0.0.1:14922" : 
+	 *     {
+	 *       "version" : 1,
+	 *       "generation: 1234973945923
+	 *     },
+	 *     // etc 
+	 * } 
+	 *
+	 */
+	private void handleRead(byte[] rawData, final String host) {
+		String data = new String(rawData).trim();	// remove the trailing "\r\n" (protocol specific)
+		//System.out.println("received data: " + data);
+		try {
+			JSONObject gossip = (JSONObject) new JSONParser().parse(data);
+			gossiper.deliverNewGossip(gossip, host);
+		} catch (ParseException e) {
+			System.err.println("ParseException in handleRead(..)");
+		}
+		sockets.get(host).readUntil(
+				"\r\n".getBytes(), 
+				new AsyncResult<byte[]>() {
+					public void onSuccess(byte[] result) { handleRead(result, host); }
+					public void onFailure(Throwable caught) { /* handleOnFailure(caught); */}
+				}
+		);
+	}
+
+	private void handleClose(String host) {
+		System.out.println("handleClose(..), closing session with: " + host);
+		sockets.remove(host);
+	}
+	
+	private void connectToNode(final String host, final String data) {
+		try {
+			SocketChannel sc = SocketChannel.open();
+			final AsynchronousSocket client = new AsynchronousSocket(sc);
+			String[] ipAndPort = host.split(":");
+			int port = Integer.valueOf(ipAndPort[1]);
+			client.connect(ipAndPort[0], port, new AsyncResult<Boolean>() {
+				public void onSuccess(Boolean result) { onConnect(host, data, client); }
+				public void onFailure(Throwable caught) { /* handleOnFailure(caught) */ }
+			});
+		} catch (IOException e) {
+			System.err.println("Could not connect to host: " + host);
+		}
+	}
+	
+	private void onConnect(final String host, String data, AsynchronousSocket client) {
+		sockets.put(host, client);
+		client.setCloseCallback(new AsyncCallback() { public void onCallback() { handleClose(host); }});
+		client.readUntil(
+				"\r\n".getBytes(), 
+				new AsyncResult<byte[]>() {
+					public void onSuccess(byte[] result) { handleRead(result, host); }
+					public void onFailure(Throwable caught) { /* handleOnFailure(caught); */}
+				}
+		);
+		if (data != null) {
+			client.write(data.getBytes(), deafAsyncCallback);
+		}
+	}
+	
+	public static void main(String[] args) {
+		new MessagingService();	// bootstrap;
+		IOLoop.INSTANCE.start();
+	}
+
+}

Added: incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/NodeStateChangeListener.java
URL: http://svn.apache.org/viewvc/incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/NodeStateChangeListener.java?rev=1243732&view=auto
==============================================================================
--- incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/NodeStateChangeListener.java (added)
+++ incubator/deft/trunk/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/NodeStateChangeListener.java Mon Feb 13 23:11:18 2012
@@ -0,0 +1,30 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.awf.example.gossip;
+
+public interface NodeStateChangeListener {
+
+	void onJoin(String host);
+
+	void onAlive(String host);
+
+	void onDead(String host);
+
+}