You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/04 16:24:24 UTC

svn commit: r1178811 - in /incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp: BSPApplicationMaster.java BSPJob.java BSPJobImpl.java BSPTaskLauncher.java sync/SyncServer.java sync/SyncServerImpl.java

Author: tjungblut
Date: Tue Oct  4 14:24:24 2011
New Revision: 1178811

URL: http://svn.apache.org/viewvc?rev=1178811&view=rev
Log:
Container allocation and SyncServer integration.


Added:
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java   (with props)
Modified:
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Tue Oct  4 14:24:24 2011
@@ -21,12 +21,16 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
@@ -42,6 +46,8 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.bsp.sync.SyncServerImpl;
 import org.apache.mina.util.AvailablePortFinder;
 
 /**
@@ -51,6 +57,8 @@ public class BSPApplicationMaster {
 
 	private static final Log LOG = LogFactory
 			.getLog(BSPApplicationMaster.class);
+	private static final ExecutorService threadPool = Executors
+			.newFixedThreadPool(1);
 
 	private Configuration localConf;
 	private Configuration jobConf;
@@ -66,9 +74,16 @@ public class BSPApplicationMaster {
 	private String userName;
 	private long startTime;
 
+	private BSPJob job;
+
+	private SyncServerImpl syncServer;
+	private Future<Long> syncServerFuture;
+
 	// RPC info where the AM receive client side requests
 	private String hostname;
-	private int port;
+	private int clientPort;
+
+	private RegisterApplicationMasterResponse applicationMasterResponse;
 
 	private BSPApplicationMaster(String[] args) throws IOException {
 		if (args.length != 1) {
@@ -91,16 +106,52 @@ public class BSPApplicationMaster {
 		startTime = clock.getTime();
 
 		// TODO this is not localhost, is it?
-		// TODO this address of the client rpc server
-		hostname = InetAddress.getLocalHost().getHostName();
-		port = getFreePort();
+		hostname = InetAddress.getLocalHost().getCanonicalHostName();
+		startSyncServer();
+		clientPort = getFreePort();
+
+		amrmRPC = getYarnRPCConnection(localConf);
+		applicationMasterResponse = registerApplicationMaster(amrmRPC,
+				appAttemptId, hostname, clientPort, null);
+	}
 
-		amrmRPC = registerWithResourceManager(localConf, appAttemptId,
-				hostname, port, null);
+	/**
+	 * This method starts the sync server on a specific port and waits for it to
+	 * come up. Be aware that this method adds the "bsp.sync.server.address"
+	 * that is needed for a task to connect to the service.
+	 * 
+	 * @throws IOException
+	 */
+	private void startSyncServer() throws IOException {
+		int syncPort = getFreePort(15000);
+		syncServer = new SyncServerImpl(jobConf.getInt("bsp.peers.num", 1),
+				hostname, syncPort);
+		syncServerFuture = threadPool.submit(syncServer);
+		// wait for the RPC to come up
+		InetSocketAddress syncAddress = NetUtils.createSocketAddr(hostname
+				+ ":" + syncPort);
+		LOG.info("Waiting for the Sync Master at " + syncAddress);
+		RPC.waitForProxy(SyncServer.class, SyncServer.versionID, syncAddress,
+				jobConf);
+		jobConf.set("bsp.sync.server.address", hostname + ":" + syncPort);
 	}
 
+	/**
+	 * Uses Minas AvailablePortFinder to find a port, starting at 14000.
+	 * 
+	 * @return a free port.
+	 */
 	private int getFreePort() {
 		int startPort = 14000;
+		return getFreePort(startPort);
+	}
+
+	/**
+	 * Uses Minas AvailablePortFinder to find a port, starting at startPort.
+	 * 
+	 * @return a free port.
+	 */
+	private int getFreePort(int startPort) {
 		while (!AvailablePortFinder.available(startPort)) {
 			startPort++;
 			LOG.debug("Testing port for availability: " + startPort);
@@ -108,17 +159,32 @@ public class BSPApplicationMaster {
 		return startPort;
 	}
 
-	private AMRMProtocol registerWithResourceManager(Configuration yarnConf,
-			ApplicationAttemptId appAttemptID, String appMasterHostName,
-			int appMasterRpcPort, String appMasterTrackingUrl)
-			throws YarnRemoteException {
+	/**
+	 * Connects to the Resource Manager.
+	 * 
+	 * @param yarnConf
+	 * @return a new RPC connection to the Resource Manager.
+	 */
+	private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) {
 		// Connect to the Scheduler of the ResourceManager.
 		InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
 				YarnConfiguration.RM_SCHEDULER_ADDRESS,
 				YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
 		LOG.info("Connecting to ResourceManager at " + rmAddress);
-		AMRMProtocol resourceManager = (AMRMProtocol) yarnRPC.getProxy(
-				AMRMProtocol.class, rmAddress, yarnConf);
+		return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress,
+				yarnConf);
+	}
+
+	/**
+	 * Registers this application master with the Resource Manager and retrieves
+	 * a response which is used to launch additional containers.
+	 * 
+	 * @throws YarnRemoteException
+	 */
+	private RegisterApplicationMasterResponse registerApplicationMaster(
+			AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID,
+			String appMasterHostName, int appMasterRpcPort,
+			String appMasterTrackingUrl) throws YarnRemoteException {
 
 		RegisterApplicationMasterRequest appMasterRequest = Records
 				.newRecord(RegisterApplicationMasterRequest.class);
@@ -127,12 +193,11 @@ public class BSPApplicationMaster {
 		appMasterRequest.setRpcPort(appMasterRpcPort);
 		// TODO tracking URL
 		// appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
-
 		RegisterApplicationMasterResponse response = resourceManager
 				.registerApplicationMaster(appMasterRequest);
 		LOG.debug("ApplicationMaster has maximum resource capability of: "
 				+ response.getMaximumResourceCapability().getMemory());
-		return resourceManager;
+		return response;
 	}
 
 	/**
@@ -159,8 +224,9 @@ public class BSPApplicationMaster {
 		return jobConf;
 	}
 
-	private void start() {
-
+	private void start() throws Exception {
+		job = new BSPJobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC);
+		job.startJob();
 	}
 
 	public static void main(String[] args) {

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java Tue Oct  4 14:24:24 2011
@@ -18,7 +18,8 @@
 package org.apache.hama.bsp;
 
 import java.util.Map;
-import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 
 /**
  * Main interface to interact with the job. Provides only getters.
@@ -32,24 +33,18 @@ public interface BSPJob {
 	public enum BSPPhase{
 		COMPUTATION, COMMUNICATION 
 	}
-
-	BSPJobID getID();
-
-	String getName();
+	
+	public JobState startJob() throws Exception;
 
 	JobState getState();
 	
 	BSPPhase getBSPPhase();
 
+	// TODO are the tasks really needed?
 	Map<TaskAttemptID, Task> getTasks();
 
 	Task getTask(TaskAttemptID taskID);
 
 	int getTotalBSPTasks();
 
-	/**
-	 * @return a path to where the config file for this job is located.
-	 */
-	Path getConfFile();
-
 }

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java Tue Oct  4 14:24:24 2011
@@ -17,18 +17,166 @@
  */
 package org.apache.hama.bsp;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.Records;
 
 public class BSPJobImpl implements BSPJob {
 
-	private ResourceRequest requestTasks(int numBSPTasks, int memoryInMb,
-			int priority) {
+	private static final Log LOG = LogFactory.getLog(BSPJobImpl.class);
+	private static final ExecutorService threadPool = Executors
+			.newCachedThreadPool();
+
+	private static final int DEFAULT_MEMORY_MB = 256;
+
+	private Configuration conf;
+	private int numBSPTasks;
+	private int priority = 0;
+	private String childOpts;
+	private int taskMemoryInMb;
+
+	private JobState state;
+	private BSPPhase phase;
+
+	private ApplicationAttemptId appAttemptId;
+	private YarnRPC yarnRPC;
+	private AMRMProtocol resourceManager;
+
+	private List<Container> allocatedContainers;
+	private List<ContainerId> releasedContainers = Collections.emptyList();
+
+	private ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+			threadPool);
+
+	public BSPJobImpl(ApplicationAttemptId appAttemptId,
+			Configuration jobConfiguration, YarnRPC yarnRPC,
+			AMRMProtocol amrmRPC) {
+		super();
+		this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1);
+		this.appAttemptId = appAttemptId;
+		this.yarnRPC = yarnRPC;
+		this.resourceManager = amrmRPC;
+		this.state = JobState.NEW;
+		this.conf = jobConfiguration;
+		this.childOpts = conf.get("bsp.child.java.opts");
+
+		this.taskMemoryInMb = getMemoryRequirements();
+		LOG.info("Memory per task: " + taskMemoryInMb + "m!");
+	}
+
+	private int getMemoryRequirements() {
+		String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+		if (newMemoryProperty == null) {
+			LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
+			return getMemoryFromOptString(childOpts);
+		} else {
+			return Integer.valueOf(newMemoryProperty);
+		}
+	}
+
+	// TODO This really needs a testcase
+	private int getMemoryFromOptString(String opts) {
+		if (!opts.contains("-Xmx")) {
+			LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
+			return DEFAULT_MEMORY_MB;
+		} else {
+			// e.G: -Xmx512m
+			int startIndex = opts.indexOf("-Xmx") + 4;
+			int endIndex = opts.indexOf(" ", startIndex);
+			String xmxString = opts.substring(startIndex, endIndex);
+			char qualifier = xmxString.charAt(xmxString.length() - 1);
+			int memory = Integer.valueOf(xmxString.substring(0,
+					xmxString.length() - 2));
+			if (qualifier == 'm') {
+				return memory;
+			} else if (qualifier == 'g') {
+				return memory * 1024;
+			} else {
+				throw new IllegalArgumentException(
+						"Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: "
+								+ opts);
+			}
+		}
+	}
+
+	@Override
+	public JobState startJob() throws YarnRemoteException, InterruptedException {
+
+		ResourceRequest request = createBSPTaskRequest(getTotalBSPTasks(),
+				taskMemoryInMb, priority);
+
+		AllocateRequest req = Records.newRecord(AllocateRequest.class);
+		// response id zero because this is our initial allocation
+		req.setResponseId(0);
+		// set ApplicationAttemptId
+		req.setApplicationAttemptId(appAttemptId);
+		// add our task request
+		req.addAsk(request);
+		// always an empty list
+		req.addAllReleases(releasedContainers);
+		// we don't have a real progress, so it is always zero
+		req.setProgress(0);
+
+		AllocateResponse allocateResponse = resourceManager.allocate(req);
+		AMResponse amResponse = allocateResponse.getAMResponse();
+		if (amResponse.getResponseId() == 0) {
+			this.allocatedContainers = amResponse.getAllocatedContainers();
+		} else {
+			LOG.error("Response IDs somehow did not match. Got: "
+					+ amResponse.getResponseId()
+					+ " where it should be 0 (zero).");
+			return JobState.FAILED;
+		}
+
+		for (Container allocatedContainer : allocatedContainers) {
+			LOG.info("Launching task on a new container." + ", containerId="
+					+ allocatedContainer.getId() + ", containerNode="
+					+ allocatedContainer.getNodeId().getHost() + ":"
+					+ allocatedContainer.getNodeId().getPort()
+					+ ", containerNodeURI="
+					+ allocatedContainer.getNodeHttpAddress()
+					+ ", containerState" + allocatedContainer.getState()
+					+ ", containerResourceMemory"
+					+ allocatedContainer.getResource().getMemory());
+
+			BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(
+					allocatedContainer, conf, yarnRPC);
+			completionService.submit(runnableLaunchContainer);
+		}
+
+		// TODO numBSPTasks could be wrong if we have to restart a task, use
+		// another field for that
+		for (int i = 0; i < numBSPTasks; i++) {
+			Future<Integer> returnedTask = completionService.take();
+			// TODO cleanup and check the return value
+		}
+
+		return JobState.SUCCESS;
+	}
+
+	private ResourceRequest createBSPTaskRequest(int numBSPTasks,
+			int memoryInMb, int priority) {
 		// Resource Request
 		ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
 
@@ -56,26 +204,13 @@ public class BSPJobImpl implements BSPJo
 	}
 
 	@Override
-	public BSPJobID getID() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public String getName() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
 	public JobState getState() {
-		// TODO Auto-generated method stub
-		return null;
+		return state;
 	}
 
 	@Override
 	public Map<TaskAttemptID, Task> getTasks() {
-		// TODO Auto-generated method stub
+		// TODO
 		return null;
 	}
 
@@ -87,20 +222,12 @@ public class BSPJobImpl implements BSPJo
 
 	@Override
 	public int getTotalBSPTasks() {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public Path getConfFile() {
-		// TODO Auto-generated method stub
-		return null;
+		return numBSPTasks;
 	}
 
 	@Override
 	public BSPPhase getBSPPhase() {
-		// TODO Auto-generated method stub
-		return null;
+		return phase;
 	}
 
 }

Added: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1178811&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (added)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Tue Oct  4 14:24:24 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+public class BSPTaskLauncher implements Callable<Integer> {
+
+	private final Container allocatedContainer;
+
+	public BSPTaskLauncher(Container container, Configuration conf, YarnRPC rpc) {
+		this.allocatedContainer = container;
+		// Connect to ContainerManager on the allocated container
+		String cmIpPortStr = container.getNodeId().getHost() + ":"
+				+ container.getNodeId().getPort();
+		InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+		ContainerManager cm = (ContainerManager) rpc.getProxy(
+				ContainerManager.class, cmAddress, conf);
+
+		// Now we setup a ContainerLaunchContext
+		ContainerLaunchContext ctx = Records
+				.newRecord(ContainerLaunchContext.class);
+
+		ctx.setContainerId(container.getId());
+		ctx.setResource(container.getResource());
+		// TODO set the commands and stuff
+	}
+
+	@Override
+	public Integer call() throws Exception {
+		// TODO just start the context and return a status for the task, maybe
+		// we have to refactor this to an enum
+		return 0;
+	}
+
+}

Propchange: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Tue Oct  4 14:24:24 2011
@@ -41,5 +41,7 @@ public interface SyncServer extends Vers
 	public String[] getAllPeerNames();
 	
 	public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress, LongWritable port);
+	
+	public void stopServer();
 
 }
\ No newline at end of file

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java Tue Oct  4 14:24:24 2011
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
@@ -37,9 +38,8 @@ import org.apache.hama.bsp.TaskAttemptID
 
 /**
  * Synchronization Deamon. <br\>
- * TODO Should be launched on the same host like the application master?
  */
-public class SyncServerImpl implements SyncServer {
+public class SyncServerImpl implements SyncServer, Callable<Long> {
 
 	private static final Log LOG = LogFactory.getLog(SyncServerImpl.class);
 
@@ -65,7 +65,10 @@ public class SyncServerImpl implements S
 		this.partySet = Collections.synchronizedSet(new HashSet<Integer>(
 				parties));
 		this.peerAddresses = Collections.synchronizedSet(new HashSet<String>());
-		this.server = RPC.getServer(this, host, port, parties, false, conf);
+		// allocate ten more rpc handler than parties for additional services to
+		// plug in or to deal with failure.
+		this.server = RPC
+				.getServer(this, host, port, parties + 10, false, conf);
 		LOG.info("Sync Server is now up at: " + host + ":" + port + "!");
 	}
 
@@ -73,6 +76,11 @@ public class SyncServerImpl implements S
 		server.start();
 	}
 
+	@Override
+	public void stopServer() {
+		server.stop();
+	}
+
 	public void join() throws InterruptedException {
 		server.join();
 	}
@@ -178,6 +186,13 @@ public class SyncServerImpl implements S
 	}
 
 	@Override
+	public Long call() throws Exception {
+		this.start();
+		this.join();
+		return this.superstep;
+	}
+
+	@Override
 	public synchronized LongWritable getSuperStep() {
 		return new LongWritable(superstep);
 	}