You are viewing a plain text version of this content. The canonical link for it is here.
Posted to awf-commits@incubator.apache.org by jm...@apache.org on 2012/02/12 20:24:36 UTC
svn commit: r1243316 [8/8] - in /incubator/deft/sandbox/jmeehan: ./
awf-core/ awf-core/.settings/ awf-core/src/ awf-core/src/main/
awf-core/src/main/assembly/ awf-core/src/main/java/
awf-core/src/main/java/org/ awf-core/src/main/java/org/apache/ awf-co...
Added: incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/Gossiper.java?rev=1243316&view=auto
==============================================================================
--- incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/Gossiper.java (added)
+++ incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/Gossiper.java Sun Feb 12 20:24:30 2012
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.awf.example.gossip;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.awf.io.callback.PeriodicCallback;
+import org.apache.awf.web.AsyncCallback;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * This module is responsible for Gossiping cluster information. The abstraction
+ * maintains the list of live and dead nodes. Periodically i.e. every 1 second this module
+ * chooses a random node and initiates a round of Gossip with it. This module as and when it hears a gossip
+ * updates the Failure Detector with the liveness information.
+ *
+ * Amazon paper on Gossip at http://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf
+
+ * Gossip timer task runs every second.
+ * During each of these runs the node initiates gossip exchange according to following rules:
+ * 1, Gossip to random live node (if any)
+ * 2, Gossip to random unreachable node with certain probability depending on number of unreachable and live nodes
+ * 3, If the node gossiped to at (1) was not seed, or the number of live nodes is less than number of seeds,
+ * gossip to random seed with certain probability depending on number of unreachable, seed and live nodes.
+ *
+ */
+public class Gossiper {
+
+ private final FailureDetector fd;
+ private final MessagingService ms;
+ private final String seed; // seed address (e.g. 192.168.0.1:14922)
+ private final String address; // local address (e.g. 192.168.0.2:14923)
+ private final Map<String, Map<String, Long>> nodeStates = Maps.newHashMap();
+ private final List<String> aliveNodes = Lists.newLinkedList();
+ private final List<String> deadNodes = Lists.newLinkedList();
+ private final List<NodeStateChangeListener> nodeStateChangeListeners = Lists.newLinkedList();
+ private final AtomicLong version = new AtomicLong(0);
+ private long generation = System.currentTimeMillis();
+
+ public Gossiper(FailureDetector fd, MessagingService ms, String seed, String address) {
+ this.fd = fd;
+ this.ms = ms;
+ this.seed = seed;
+ this.address = address;
+ new PeriodicCallback(
+ new AsyncCallback() { public void onCallback() { inititateGossipExhange(); scrutinizeCluster(); }},
+ 1000
+ ).start();
+
+ }
+
+
+ private void inititateGossipExhange() {
+ version.incrementAndGet();
+ boolean gossipToSeed = false;
+ if (!aliveNodes.isEmpty()) {
+ gossipToSeed = sendGossip(aliveNodes);
+ }
+
+ if (!deadNodes.isEmpty()) {
+ double probability = ((double) deadNodes.size()) / (aliveNodes.size() + 1);
+ if (new Random().nextDouble() < probability) {
+ sendGossip(deadNodes);
+ }
+ }
+
+ if (!gossipToSeed || aliveNodes.size() < 1) {
+ // gossip to a seed for facilitating partition healing
+ if (seed.equals(address)) {
+ return;
+ }
+ if (aliveNodes.isEmpty()) {
+ sendGossip(Collections.singletonList(seed));
+ }
+ else {
+ double probability = 1.0 / (aliveNodes.size() + deadNodes.size());
+ if (new Random().nextDouble() <= probability) {
+ sendGossip(Collections.singletonList(seed));
+ }
+ }
+ }
+ }
+
+ /** Returns true if the gossip was sent to a seed
+ * The gossip will be sent to a randomly chosen node from the nodes list.
+ */
+ private boolean sendGossip(List<String> nodes) {
+ List<String> copy = Lists.newLinkedList(nodes);
+ Collections.shuffle(copy);
+ String node = copy.get(0);
+ Map<String, Map<String, Long>> gossip = Maps.newHashMap(nodeStates);
+ // dont gossip about dead nodes
+ for (String host : deadNodes) {
+ gossip.remove(host);
+ }
+ gossip.put(address, localState());
+ ms.sendOneWay(node, gossip);
+ return node == seed;
+ }
+
+ private Map<String, Long> localState() {
+ Map<String, Long> myState = Maps.newHashMap();
+ myState.put("generation", generation);
+ myState.put("version", version.get());
+ return myState;
+ }
+
+
+ private void scrutinizeCluster() {
+ List<String> dead = Lists.newLinkedList();
+ for (String host : aliveNodes) {
+ if (fd.isDead(host)) {
+ dead.add(host);
+ }
+ }
+
+ if (!dead.isEmpty()) {
+ deadNodes.addAll(dead);
+ aliveNodes.removeAll(dead);
+ for (String host: dead) {
+ notifyOnDead(host);
+ }
+ }
+ }
+
+ /**
+ * Invoked by the MessagingService when we receive gossip from another node in the cluster
+ */
+ public void deliverNewGossip(Map<String, Map<String, Long>> gossip, String sender) {
+ //gossip will contain info about me
+ gossip.remove(address);
+ for (String host : gossip.keySet()) {
+ Map<String, Long> digest = gossip.get(host);
+ if (nodeStates.containsKey(host)) {
+ // has digest about host, maybe update
+ if (digest.get("generation") > nodeStates.get(host).get("generation")) {
+ // node has restarted
+ if (aliveNodes.contains(host)) {
+ // we haven't marked the restarted node as dead yet (fast restart maybe)
+ // mark as dead so maintenance like resetting connection pools can occur
+ aliveNodes.remove(host);
+ deadNodes.add(host);
+ notifyOnDead(host);
+
+ }
+ updateNodeState(host, digest);
+ } else if (digest.get("version") > nodeStates.get(host).get("version") &&
+ digest.get("generation").longValue() == nodeStates.get(host).get("generation").longValue()) {
+ updateNodeState(host, digest);
+ }
+ } else {
+ // had no previous info about host
+ nodeStates.put(host, digest);
+ fd.heartbeat(host);
+ aliveNodes.add(host);
+ notifyOnJoin(host);
+ }
+ }
+ }
+
+ private void updateNodeState(String host, Map<String, Long> digest) {
+ fd.heartbeat(host);
+ nodeStates.put(host, digest);
+ if (deadNodes.contains(host)) {
+ deadNodes.remove(host);
+ aliveNodes.add(host);
+ notifyOnAlive(host);
+ }
+ }
+
+ private void notifyOnJoin(String host) {
+ System.out.println("onJoin(" + host + ")");
+ for (NodeStateChangeListener listener : nodeStateChangeListeners) {
+ listener.onJoin(host);
+ }
+ }
+
+ private void notifyOnAlive(String host) {
+ System.out.println("onAlive(" + host + ")");
+ for (NodeStateChangeListener listener : nodeStateChangeListeners) {
+ listener.onAlive (host);
+ }
+ }
+
+ private void notifyOnDead(String host) {
+ System.out.println("onDead(" + host + ")");
+ for (NodeStateChangeListener listener : nodeStateChangeListeners) {
+ listener.onDead(host);
+ }
+ }
+
+}
Added: incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/MessagingService.java?rev=1243316&view=auto
==============================================================================
--- incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/MessagingService.java (added)
+++ incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/MessagingService.java Sun Feb 12 20:24:30 2012
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.awf.example.gossip;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import org.apache.awf.io.AsynchronousSocket;
+import org.apache.awf.io.IOLoop;
+import org.apache.awf.util.AcceptUtil;
+import org.apache.awf.web.AsyncCallback;
+import org.apache.awf.web.AsyncResult;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+import com.google.common.collect.Maps;
+
+public class MessagingService {
+
+ private final String seed = "127.0.0.1:14922";
+ private final String address = "127.0.0.1:14922";
+ private final Map<String, AsynchronousSocket> sockets = Maps.newHashMap();
+ private final Gossiper gossiper;
+
+ private final AsyncCallback deafAsyncCallback = new AsyncCallback() { public void onCallback() { /*nop*/}};
+
+ public MessagingService() {
+ FailureDetector fd = new FailureDetector();
+ gossiper = new Gossiper(fd, this, seed, address);
+ bind();
+ }
+
+ public void sendOneWay(String to, Map<String, Map<String, Long>> gossip) {
+ String data = JSONValue.toJSONString(gossip) + "\r\n";
+ //System.out.println("Sending data: " + data.trim() + " to: " + to);
+ if (to.equals(address)) {
+ // handle local delivery
+ System.out.println("Local delivery..");
+ }
+ if (sockets.containsKey(to)) {
+ sockets.get(to).write(data.getBytes() , deafAsyncCallback);
+ } else {
+ connectToNode(to, data);
+ }
+ }
+
+ private void bind() {
+ try {
+ final ServerSocketChannel server = ServerSocketChannel.open();
+ server.configureBlocking(false);
+ int port = Integer.valueOf(address.split(":")[1]);
+ InetSocketAddress endpoint = new InetSocketAddress(port); // use the "any" (0.0.0.0) address
+ server.socket().bind(endpoint);
+ AcceptUtil.accept(server, new AsyncCallback() { public void onCallback() { onAccept(server);} });
+ System.out.println("Elastica node bootstrapped successfully");
+ } catch (IOException e) {
+ System.err.println("Error during bind");
+ }
+ }
+
+ private void onAccept(ServerSocketChannel server) {
+ try {
+ SocketChannel clientChannel = server.accept();
+ final AsynchronousSocket client = new AsynchronousSocket(clientChannel);
+ final String remoteHost = clientChannel.socket().getRemoteSocketAddress().toString();
+ sockets.put(remoteHost, client);
+ client.setCloseCallback(new AsyncCallback() { public void onCallback() { handleClose(remoteHost); }});
+ client.readUntil(
+ "\r\n".getBytes(),
+ new AsyncResult<byte[]>() {
+ public void onSuccess(byte[] result) { handleRead(result, remoteHost); }
+ public void onFailure(Throwable caught) { /* handleOnFailure(caught); */}
+ }
+ );
+ } catch (IOException ioe) {
+ System.err.println("Exception during client accept");
+ }
+ }
+
+ /**
+ * (JSON) format for data:
+ * {
+ * "127.0.0.1:14922" :
+ * {
+ * "version" : 1,
+ * "generation: 1234973945923
+ * },
+ * // etc
+ * }
+ *
+ */
+ private void handleRead(byte[] rawData, final String host) {
+ String data = new String(rawData).trim(); // remove the trailing "\r\n" (protocol specific)
+ //System.out.println("received data: " + data);
+ try {
+ JSONObject gossip = (JSONObject) new JSONParser().parse(data);
+ gossiper.deliverNewGossip(gossip, host);
+ } catch (ParseException e) {
+ System.err.println("ParseException in handleRead(..)");
+ }
+ sockets.get(host).readUntil(
+ "\r\n".getBytes(),
+ new AsyncResult<byte[]>() {
+ public void onSuccess(byte[] result) { handleRead(result, host); }
+ public void onFailure(Throwable caught) { /* handleOnFailure(caught); */}
+ }
+ );
+ }
+
+ private void handleClose(String host) {
+ System.out.println("handleClose(..), closing session with: " + host);
+ sockets.remove(host);
+ }
+
+ private void connectToNode(final String host, final String data) {
+ try {
+ SocketChannel sc = SocketChannel.open();
+ final AsynchronousSocket client = new AsynchronousSocket(sc);
+ String[] ipAndPort = host.split(":");
+ int port = Integer.valueOf(ipAndPort[1]);
+ client.connect(ipAndPort[0], port, new AsyncResult<Boolean>() {
+ public void onSuccess(Boolean result) { onConnect(host, data, client); }
+ public void onFailure(Throwable caught) { /* handleOnFailure(caught) */ }
+ });
+ } catch (IOException e) {
+ System.err.println("Could not connect to host: " + host);
+ }
+ }
+
+ private void onConnect(final String host, String data, AsynchronousSocket client) {
+ sockets.put(host, client);
+ client.setCloseCallback(new AsyncCallback() { public void onCallback() { handleClose(host); }});
+ client.readUntil(
+ "\r\n".getBytes(),
+ new AsyncResult<byte[]>() {
+ public void onSuccess(byte[] result) { handleRead(result, host); }
+ public void onFailure(Throwable caught) { /* handleOnFailure(caught); */}
+ }
+ );
+ if (data != null) {
+ client.write(data.getBytes(), deafAsyncCallback);
+ }
+ }
+
+ public static void main(String[] args) {
+ new MessagingService(); // bootstrap;
+ IOLoop.INSTANCE.start();
+ }
+
+}
Added: incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/NodeStateChangeListener.java
URL: http://svn.apache.org/viewvc/incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/NodeStateChangeListener.java?rev=1243316&view=auto
==============================================================================
--- incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/NodeStateChangeListener.java (added)
+++ incubator/deft/sandbox/jmeehan/awf-example-gossip/src/main/java/org/apache/awf/example/gossip/NodeStateChangeListener.java Sun Feb 12 20:24:30 2012
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.awf.example.gossip;
+
+public interface NodeStateChangeListener {
+
+ void onJoin(String host);
+
+ void onAlive(String host);
+
+ void onDead(String host);
+
+}
Added: incubator/deft/sandbox/jmeehan/awf-parent/.project
URL: http://svn.apache.org/viewvc/incubator/deft/sandbox/jmeehan/awf-parent/.project?rev=1243316&view=auto
==============================================================================
--- incubator/deft/sandbox/jmeehan/awf-parent/.project (added)
+++ incubator/deft/sandbox/jmeehan/awf-parent/.project Sun Feb 12 20:24:30 2012
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>awf-parent</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.m2e.core.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.m2e.core.maven2Nature</nature>
+ </natures>
+</projectDescription>
Added: incubator/deft/sandbox/jmeehan/awf-parent/.settings/org.eclipse.m2e.core.prefs
URL: http://svn.apache.org/viewvc/incubator/deft/sandbox/jmeehan/awf-parent/.settings/org.eclipse.m2e.core.prefs?rev=1243316&view=auto
==============================================================================
--- incubator/deft/sandbox/jmeehan/awf-parent/.settings/org.eclipse.m2e.core.prefs (added)
+++ incubator/deft/sandbox/jmeehan/awf-parent/.settings/org.eclipse.m2e.core.prefs Sun Feb 12 20:24:30 2012
@@ -0,0 +1,5 @@
+#Sun Feb 12 15:26:17 GMT 2012
+activeProfiles=
+eclipse.preferences.version=1
+resolveWorkspaceProjects=true
+version=1
Added: incubator/deft/sandbox/jmeehan/awf-parent/pom.xml
URL: http://svn.apache.org/viewvc/incubator/deft/sandbox/jmeehan/awf-parent/pom.xml?rev=1243316&view=auto
==============================================================================
--- incubator/deft/sandbox/jmeehan/awf-parent/pom.xml (added)
+++ incubator/deft/sandbox/jmeehan/awf-parent/pom.xml Sun Feb 12 20:24:30 2012
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the
+ License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.awf</groupId>
+ <artifactId>awf-parent</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <name>Apache AWF Parent</name>
+
+ <!-- Please keep AWF references at the top, and
+ all others in alphabetical order. -->
+ <properties>
+ <awf.core.version>0.4.0-SNAPSHOT</awf.core.version>
+
+ <activation.version>1.1.1</activation.version>
+ <guava.version>r08</guava.version>
+ <json-simple.version>1.1</json-simple.version>
+ <httpclient.version>4.1</httpclient.version>
+ <junit.version>4.8.2</junit.version>
+ <logback.version>0.9.28</logback.version>
+ <mockito.version>1.8.5</mockito.version>
+ <ning.version>1.6.1</ning.version>
+ <powermock.version>1.4.10</powermock.version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.awf</groupId>
+ <artifactId>awf-core</artifactId>
+ <version>${awf.core.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.activation</groupId>
+ <artifactId>activation</artifactId>
+ <version>${activation.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${json-simple.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ <type>jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>async-http-client</artifactId>
+ <version>${ning.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <modules>
+ <module>../awf-core</module>
+ <module>../awf-example-gossip</module>
+ </modules>
+</project>
\ No newline at end of file