You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/09/29 19:31:42 UTC

svn commit: r1177378 - in /incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync: ./ SyncServer.java SyncServerImpl.java

Author: tjungblut
Date: Thu Sep 29 17:31:42 2011
New Revision: 1177378

URL: http://svn.apache.org/viewvc?rev=1177378&view=rev
Log:
Added synchronization service.

Added:
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java   (with props)
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java   (with props)

Added: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1177378&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (added)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Thu Sep 29 17:31:42 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Hadoop RPC based barrier synchronization service.
+ * 
+ */
+public interface SyncServer extends VersionedProtocol {
+
+	public static final long versionID = 1L;
+
+	public void enterBarrier(TaskAttemptID id);
+
+	public void leaveBarrier(TaskAttemptID id);
+
+	public void register(TaskAttemptID id, Text hostAddress, LongWritable port);
+
+	public LongWritable getSuperStep();
+
+	public String[] getAllPeerNames();
+	
+	public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress, LongWritable port);
+
+}
\ No newline at end of file

Propchange: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java?rev=1177378&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java (added)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java Thu Sep 29 17:31:42 2011
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Synchronization Deamon. <br\>
+ * TODO Should be launched on the same host like the application master?
+ */
+public class SyncServerImpl implements SyncServer {
+
+	private static final Log LOG = LogFactory.getLog(SyncServerImpl.class);
+
+	private Configuration conf = new Configuration();
+	private Server server;
+
+	private int parties;
+
+	private CyclicBarrier barrier;
+	private CyclicBarrier leaveBarrier;
+	private Set<Integer> partySet;
+	private Set<String> peerAddresses;
+
+	private volatile long superstep = 0L;
+
+	public SyncServerImpl(int parties, String host, int port)
+			throws IOException {
+		this.parties = parties;
+		this.barrier = new CyclicBarrier(parties);
+		this.leaveBarrier = new CyclicBarrier(parties,
+				new SuperStepIncrementor(this));
+
+		this.partySet = Collections.synchronizedSet(new HashSet<Integer>(
+				parties));
+		this.peerAddresses = Collections.synchronizedSet(new HashSet<String>());
+		this.server = RPC.getServer(this, host, port, parties, false, conf);
+		LOG.info("Sync Server is now up at: " + host + ":" + port + "!");
+	}
+
+	public void start() throws IOException {
+		server.start();
+	}
+
+	public void join() throws InterruptedException {
+		server.join();
+	}
+
+	public static SyncServer getService(Configuration conf)
+			throws NumberFormatException, IOException {
+		String syncAddress = conf.get("hama.sync.server.address");
+		if (syncAddress == null || syncAddress.isEmpty()
+				|| !syncAddress.contains(":")) {
+			throw new IllegalArgumentException(
+					"Server sync address must contain a colon and must be non-empty and not-null! Property \"hama.sync.server.address\" was: "
+							+ syncAddress);
+		}
+		String[] hostPort = syncAddress.split(":");
+		return (SyncServer) RPC.waitForProxy(SyncServer.class,
+				SyncServer.versionID, new InetSocketAddress(hostPort[0],
+						Integer.valueOf(hostPort[1])), conf);
+
+	}
+
+	@Override
+	public void enterBarrier(TaskAttemptID id) {
+		LOG.info("Task: " + id.getId() + " entered Barrier!");
+		if (partySet.contains(id.getId())) {
+			try {
+				barrier.await();
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			} catch (BrokenBarrierException e) {
+				e.printStackTrace();
+			}
+		} else {
+			LOG.warn("TaskID " + id + " is no verified task!");
+		}
+	}
+
+	@Override
+	public void leaveBarrier(TaskAttemptID id) {
+		LOG.info("Task: " + id.getId() + " leaves Barrier!");
+		if (partySet.contains(id.getId())) {
+			try {
+				leaveBarrier.await();
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			} catch (BrokenBarrierException e) {
+				e.printStackTrace();
+			}
+		} else {
+			LOG.warn("TaskID " + id + " is no verified task!");
+		}
+	}
+
+	@Override
+	public synchronized void register(TaskAttemptID id, Text hostAddress,
+			LongWritable port) {
+		partySet.add(id.getId());
+		String peer = hostAddress.toString() + ":" + port.get();
+		peerAddresses.add(peer);
+		LOG.info("Registered: " + id.getId() + " for peer " + peer);
+		if (partySet.size() > parties) {
+			LOG.warn("Registered more tasks than configured!");
+		}
+	}
+
+	@Override
+	public long getProtocolVersion(String protocol, long clientVersion)
+			throws IOException {
+		return clientVersion;
+	}
+
+	private static class SuperStepIncrementor implements Runnable {
+
+		private final SyncServerImpl instance;
+
+		public SuperStepIncrementor(SyncServerImpl syncServer) {
+			this.instance = syncServer;
+		}
+
+		@Override
+		public void run() {
+			synchronized (instance) {
+				this.instance.superstep += 1L;
+				LOG.info("Entering superstep: " + this.instance.superstep);
+			}
+		}
+
+	}
+
+	public static void main(String[] args) throws IOException,
+			InterruptedException {
+		LOG.info(Arrays.toString(args));
+		if (args.length == 3) {
+			SyncServerImpl syncServer = new SyncServerImpl(
+					Integer.valueOf(args[0]), args[1], Integer.valueOf(args[2]));
+			syncServer.start();
+			syncServer.join();
+		} else {
+			throw new IllegalArgumentException(
+					"Argument count does not match 3! Given size was "
+							+ args.length + " and parameters were "
+							+ Arrays.toString(args));
+		}
+	}
+
+	@Override
+	public synchronized LongWritable getSuperStep() {
+		return new LongWritable(superstep);
+	}
+
+	@Override
+	public synchronized String[] getAllPeerNames() {
+		return peerAddresses.toArray(new String[peerAddresses.size()]);
+	}
+
+	@Override
+	public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
+			LongWritable port) {
+		// TODO Auto-generated method stub
+		// basically has to recreate the barriers and remove from the two basic
+		// sets.
+	}
+
+}
\ No newline at end of file

Propchange: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native