You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/04 16:24:24 UTC
svn commit: r1178811 - in
/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp:
BSPApplicationMaster.java BSPJob.java BSPJobImpl.java BSPTaskLauncher.java
sync/SyncServer.java sync/SyncServerImpl.java
Author: tjungblut
Date: Tue Oct 4 14:24:24 2011
New Revision: 1178811
URL: http://svn.apache.org/viewvc?rev=1178811&view=rev
Log:
Container allocation and SyncServer integration.
Added:
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (with props)
Modified:
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Tue Oct 4 14:24:24 2011
@@ -21,12 +21,16 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
@@ -42,6 +46,8 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.bsp.sync.SyncServerImpl;
import org.apache.mina.util.AvailablePortFinder;
/**
@@ -51,6 +57,8 @@ public class BSPApplicationMaster {
private static final Log LOG = LogFactory
.getLog(BSPApplicationMaster.class);
+ private static final ExecutorService threadPool = Executors
+ .newFixedThreadPool(1);
private Configuration localConf;
private Configuration jobConf;
@@ -66,9 +74,16 @@ public class BSPApplicationMaster {
private String userName;
private long startTime;
+ private BSPJob job;
+
+ private SyncServerImpl syncServer;
+ private Future<Long> syncServerFuture;
+
// RPC info where the AM receive client side requests
private String hostname;
- private int port;
+ private int clientPort;
+
+ private RegisterApplicationMasterResponse applicationMasterResponse;
private BSPApplicationMaster(String[] args) throws IOException {
if (args.length != 1) {
@@ -91,16 +106,52 @@ public class BSPApplicationMaster {
startTime = clock.getTime();
// TODO this is not localhost, is it?
- // TODO this address of the client rpc server
- hostname = InetAddress.getLocalHost().getHostName();
- port = getFreePort();
+ hostname = InetAddress.getLocalHost().getCanonicalHostName();
+ startSyncServer();
+ clientPort = getFreePort();
+
+ amrmRPC = getYarnRPCConnection(localConf);
+ applicationMasterResponse = registerApplicationMaster(amrmRPC,
+ appAttemptId, hostname, clientPort, null);
+ }
- amrmRPC = registerWithResourceManager(localConf, appAttemptId,
- hostname, port, null);
+ /**
+ * This method starts the sync server on a specific port and waits for it to
+ * come up. Be aware that this method adds the "bsp.sync.server.address"
+ * that is needed for a task to connect to the service.
+ *
+ * @throws IOException
+ */
+ private void startSyncServer() throws IOException {
+ int syncPort = getFreePort(15000);
+ syncServer = new SyncServerImpl(jobConf.getInt("bsp.peers.num", 1),
+ hostname, syncPort);
+ syncServerFuture = threadPool.submit(syncServer);
+ // wait for the RPC to come up
+ InetSocketAddress syncAddress = NetUtils.createSocketAddr(hostname
+ + ":" + syncPort);
+ LOG.info("Waiting for the Sync Master at " + syncAddress);
+ RPC.waitForProxy(SyncServer.class, SyncServer.versionID, syncAddress,
+ jobConf);
+ jobConf.set("bsp.sync.server.address", hostname + ":" + syncPort);
}
+ /**
+ * Uses Minas AvailablePortFinder to find a port, starting at 14000.
+ *
+ * @return a free port.
+ */
private int getFreePort() {
int startPort = 14000;
+ return getFreePort(startPort);
+ }
+
+ /**
+ * Uses Minas AvailablePortFinder to find a port, starting at startPort.
+ *
+ * @return a free port.
+ */
+ private int getFreePort(int startPort) {
while (!AvailablePortFinder.available(startPort)) {
startPort++;
LOG.debug("Testing port for availability: " + startPort);
@@ -108,17 +159,32 @@ public class BSPApplicationMaster {
return startPort;
}
- private AMRMProtocol registerWithResourceManager(Configuration yarnConf,
- ApplicationAttemptId appAttemptID, String appMasterHostName,
- int appMasterRpcPort, String appMasterTrackingUrl)
- throws YarnRemoteException {
+ /**
+ * Connects to the Resource Manager.
+ *
+ * @param yarnConf
+ * @return a new RPC connection to the Resource Manager.
+ */
+ private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) {
// Connect to the Scheduler of the ResourceManager.
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
- AMRMProtocol resourceManager = (AMRMProtocol) yarnRPC.getProxy(
- AMRMProtocol.class, rmAddress, yarnConf);
+ return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress,
+ yarnConf);
+ }
+
+ /**
+ * Registers this application master with the Resource Manager and retrieves
+ * a response which is used to launch additional containers.
+ *
+ * @throws YarnRemoteException
+ */
+ private RegisterApplicationMasterResponse registerApplicationMaster(
+ AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID,
+ String appMasterHostName, int appMasterRpcPort,
+ String appMasterTrackingUrl) throws YarnRemoteException {
RegisterApplicationMasterRequest appMasterRequest = Records
.newRecord(RegisterApplicationMasterRequest.class);
@@ -127,12 +193,11 @@ public class BSPApplicationMaster {
appMasterRequest.setRpcPort(appMasterRpcPort);
// TODO tracking URL
// appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
-
RegisterApplicationMasterResponse response = resourceManager
.registerApplicationMaster(appMasterRequest);
LOG.debug("ApplicationMaster has maximum resource capability of: "
+ response.getMaximumResourceCapability().getMemory());
- return resourceManager;
+ return response;
}
/**
@@ -159,8 +224,9 @@ public class BSPApplicationMaster {
return jobConf;
}
- private void start() {
-
+ private void start() throws Exception {
+ job = new BSPJobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC);
+ job.startJob();
}
public static void main(String[] args) {
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJob.java Tue Oct 4 14:24:24 2011
@@ -18,7 +18,8 @@
package org.apache.hama.bsp;
import java.util.Map;
-import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
/**
* Main interface to interact with the job. Provides only getters.
@@ -32,24 +33,18 @@ public interface BSPJob {
public enum BSPPhase{
COMPUTATION, COMMUNICATION
}
-
- BSPJobID getID();
-
- String getName();
+
+ public JobState startJob() throws Exception;
JobState getState();
BSPPhase getBSPPhase();
+ // TODO are the tasks really needed?
Map<TaskAttemptID, Task> getTasks();
Task getTask(TaskAttemptID taskID);
int getTotalBSPTasks();
- /**
- * @return a path to where the config file for this job is located.
- */
- Path getConfFile();
-
}
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java Tue Oct 4 14:24:24 2011
@@ -17,18 +17,166 @@
*/
package org.apache.hama.bsp;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
public class BSPJobImpl implements BSPJob {
- private ResourceRequest requestTasks(int numBSPTasks, int memoryInMb,
- int priority) {
+ private static final Log LOG = LogFactory.getLog(BSPJobImpl.class);
+ private static final ExecutorService threadPool = Executors
+ .newCachedThreadPool();
+
+ private static final int DEFAULT_MEMORY_MB = 256;
+
+ private Configuration conf;
+ private int numBSPTasks;
+ private int priority = 0;
+ private String childOpts;
+ private int taskMemoryInMb;
+
+ private JobState state;
+ private BSPPhase phase;
+
+ private ApplicationAttemptId appAttemptId;
+ private YarnRPC yarnRPC;
+ private AMRMProtocol resourceManager;
+
+ private List<Container> allocatedContainers;
+ private List<ContainerId> releasedContainers = Collections.emptyList();
+
+ private ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ threadPool);
+
+ public BSPJobImpl(ApplicationAttemptId appAttemptId,
+ Configuration jobConfiguration, YarnRPC yarnRPC,
+ AMRMProtocol amrmRPC) {
+ super();
+ this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1);
+ this.appAttemptId = appAttemptId;
+ this.yarnRPC = yarnRPC;
+ this.resourceManager = amrmRPC;
+ this.state = JobState.NEW;
+ this.conf = jobConfiguration;
+ this.childOpts = conf.get("bsp.child.java.opts");
+
+ this.taskMemoryInMb = getMemoryRequirements();
+ LOG.info("Memory per task: " + taskMemoryInMb + "m!");
+ }
+
+ private int getMemoryRequirements() {
+ String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+ if (newMemoryProperty == null) {
+ LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
+ return getMemoryFromOptString(childOpts);
+ } else {
+ return Integer.valueOf(newMemoryProperty);
+ }
+ }
+
+ // TODO This really needs a testcase
+ private int getMemoryFromOptString(String opts) {
+ if (!opts.contains("-Xmx")) {
+ LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
+ return DEFAULT_MEMORY_MB;
+ } else {
+ // e.G: -Xmx512m
+ int startIndex = opts.indexOf("-Xmx") + 4;
+ int endIndex = opts.indexOf(" ", startIndex);
+ String xmxString = opts.substring(startIndex, endIndex);
+ char qualifier = xmxString.charAt(xmxString.length() - 1);
+ int memory = Integer.valueOf(xmxString.substring(0,
+ xmxString.length() - 2));
+ if (qualifier == 'm') {
+ return memory;
+ } else if (qualifier == 'g') {
+ return memory * 1024;
+ } else {
+ throw new IllegalArgumentException(
+ "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: "
+ + opts);
+ }
+ }
+ }
+
+ @Override
+ public JobState startJob() throws YarnRemoteException, InterruptedException {
+
+ ResourceRequest request = createBSPTaskRequest(getTotalBSPTasks(),
+ taskMemoryInMb, priority);
+
+ AllocateRequest req = Records.newRecord(AllocateRequest.class);
+ // response id zero because this is our initial allocation
+ req.setResponseId(0);
+ // set ApplicationAttemptId
+ req.setApplicationAttemptId(appAttemptId);
+ // add our task request
+ req.addAsk(request);
+ // always an empty list
+ req.addAllReleases(releasedContainers);
+ // we don't have a real progress, so it is always zero
+ req.setProgress(0);
+
+ AllocateResponse allocateResponse = resourceManager.allocate(req);
+ AMResponse amResponse = allocateResponse.getAMResponse();
+ if (amResponse.getResponseId() == 0) {
+ this.allocatedContainers = amResponse.getAllocatedContainers();
+ } else {
+ LOG.error("Response IDs somehow did not match. Got: "
+ + amResponse.getResponseId()
+ + " where it should be 0 (zero).");
+ return JobState.FAILED;
+ }
+
+ for (Container allocatedContainer : allocatedContainers) {
+ LOG.info("Launching task on a new container." + ", containerId="
+ + allocatedContainer.getId() + ", containerNode="
+ + allocatedContainer.getNodeId().getHost() + ":"
+ + allocatedContainer.getNodeId().getPort()
+ + ", containerNodeURI="
+ + allocatedContainer.getNodeHttpAddress()
+ + ", containerState" + allocatedContainer.getState()
+ + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory());
+
+ BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(
+ allocatedContainer, conf, yarnRPC);
+ completionService.submit(runnableLaunchContainer);
+ }
+
+ // TODO numBSPTasks could be wrong if we have to restart a task, use
+ // another field for that
+ for (int i = 0; i < numBSPTasks; i++) {
+ Future<Integer> returnedTask = completionService.take();
+ // TODO cleanup and check the return value
+ }
+
+ return JobState.SUCCESS;
+ }
+
+ private ResourceRequest createBSPTaskRequest(int numBSPTasks,
+ int memoryInMb, int priority) {
// Resource Request
ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
@@ -56,26 +204,13 @@ public class BSPJobImpl implements BSPJo
}
@Override
- public BSPJobID getID() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public String getName() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public JobState getState() {
- // TODO Auto-generated method stub
- return null;
+ return state;
}
@Override
public Map<TaskAttemptID, Task> getTasks() {
- // TODO Auto-generated method stub
+ // TODO
return null;
}
@@ -87,20 +222,12 @@ public class BSPJobImpl implements BSPJo
@Override
public int getTotalBSPTasks() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public Path getConfFile() {
- // TODO Auto-generated method stub
- return null;
+ return numBSPTasks;
}
@Override
public BSPPhase getBSPPhase() {
- // TODO Auto-generated method stub
- return null;
+ return phase;
}
}
Added: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1178811&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (added)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Tue Oct 4 14:24:24 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+public class BSPTaskLauncher implements Callable<Integer> {
+
+ private final Container allocatedContainer;
+
+ public BSPTaskLauncher(Container container, Configuration conf, YarnRPC rpc) {
+ this.allocatedContainer = container;
+ // Connect to ContainerManager on the allocated container
+ String cmIpPortStr = container.getNodeId().getHost() + ":"
+ + container.getNodeId().getPort();
+ InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+ ContainerManager cm = (ContainerManager) rpc.getProxy(
+ ContainerManager.class, cmAddress, conf);
+
+ // Now we setup a ContainerLaunchContext
+ ContainerLaunchContext ctx = Records
+ .newRecord(ContainerLaunchContext.class);
+
+ ctx.setContainerId(container.getId());
+ ctx.setResource(container.getResource());
+ // TODO set the commands and stuff
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ // TODO just start the context and return a status for the task, maybe
+ // we have to refactor this to an enum
+ return 0;
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Tue Oct 4 14:24:24 2011
@@ -41,5 +41,7 @@ public interface SyncServer extends Vers
public String[] getAllPeerNames();
public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress, LongWritable port);
+
+ public void stopServer();
}
\ No newline at end of file
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java?rev=1178811&r1=1178810&r2=1178811&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java Tue Oct 4 14:24:24 2011
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
@@ -37,9 +38,8 @@ import org.apache.hama.bsp.TaskAttemptID
/**
* Synchronization Deamon. <br\>
- * TODO Should be launched on the same host like the application master?
*/
-public class SyncServerImpl implements SyncServer {
+public class SyncServerImpl implements SyncServer, Callable<Long> {
private static final Log LOG = LogFactory.getLog(SyncServerImpl.class);
@@ -65,7 +65,10 @@ public class SyncServerImpl implements S
this.partySet = Collections.synchronizedSet(new HashSet<Integer>(
parties));
this.peerAddresses = Collections.synchronizedSet(new HashSet<String>());
- this.server = RPC.getServer(this, host, port, parties, false, conf);
+ // allocate ten more rpc handler than parties for additional services to
+ // plug in or to deal with failure.
+ this.server = RPC
+ .getServer(this, host, port, parties + 10, false, conf);
LOG.info("Sync Server is now up at: " + host + ":" + port + "!");
}
@@ -73,6 +76,11 @@ public class SyncServerImpl implements S
server.start();
}
+ @Override
+ public void stopServer() {
+ server.stop();
+ }
+
public void join() throws InterruptedException {
server.join();
}
@@ -178,6 +186,13 @@ public class SyncServerImpl implements S
}
@Override
+ public Long call() throws Exception {
+ this.start();
+ this.join();
+ return this.superstep;
+ }
+
+ @Override
public synchronized LongWritable getSuperStep() {
return new LongWritable(superstep);
}