You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/09/29 19:31:42 UTC
svn commit: r1177378 - in
/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync:
./ SyncServer.java SyncServerImpl.java
Author: tjungblut
Date: Thu Sep 29 17:31:42 2011
New Revision: 1177378
URL: http://svn.apache.org/viewvc?rev=1177378&view=rev
Log:
Added synchronization service.
Added:
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (with props)
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java (with props)
Added: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1177378&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (added)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Thu Sep 29 17:31:42 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Hadoop RPC based barrier synchronization service.
+ *
+ */
+public interface SyncServer extends VersionedProtocol {
+
+ public static final long versionID = 1L;
+
+ public void enterBarrier(TaskAttemptID id);
+
+ public void leaveBarrier(TaskAttemptID id);
+
+ public void register(TaskAttemptID id, Text hostAddress, LongWritable port);
+
+ public LongWritable getSuperStep();
+
+ public String[] getAllPeerNames();
+
+ public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress, LongWritable port);
+
+}
\ No newline at end of file
Propchange: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java?rev=1177378&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java (added)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java Thu Sep 29 17:31:42 2011
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Synchronization Deamon. <br\>
+ * TODO Should be launched on the same host like the application master?
+ */
+public class SyncServerImpl implements SyncServer {
+
+ private static final Log LOG = LogFactory.getLog(SyncServerImpl.class);
+
+ private Configuration conf = new Configuration();
+ private Server server;
+
+ private int parties;
+
+ private CyclicBarrier barrier;
+ private CyclicBarrier leaveBarrier;
+ private Set<Integer> partySet;
+ private Set<String> peerAddresses;
+
+ private volatile long superstep = 0L;
+
+ public SyncServerImpl(int parties, String host, int port)
+ throws IOException {
+ this.parties = parties;
+ this.barrier = new CyclicBarrier(parties);
+ this.leaveBarrier = new CyclicBarrier(parties,
+ new SuperStepIncrementor(this));
+
+ this.partySet = Collections.synchronizedSet(new HashSet<Integer>(
+ parties));
+ this.peerAddresses = Collections.synchronizedSet(new HashSet<String>());
+ this.server = RPC.getServer(this, host, port, parties, false, conf);
+ LOG.info("Sync Server is now up at: " + host + ":" + port + "!");
+ }
+
+ public void start() throws IOException {
+ server.start();
+ }
+
+ public void join() throws InterruptedException {
+ server.join();
+ }
+
+ public static SyncServer getService(Configuration conf)
+ throws NumberFormatException, IOException {
+ String syncAddress = conf.get("hama.sync.server.address");
+ if (syncAddress == null || syncAddress.isEmpty()
+ || !syncAddress.contains(":")) {
+ throw new IllegalArgumentException(
+ "Server sync address must contain a colon and must be non-empty and not-null! Property \"hama.sync.server.address\" was: "
+ + syncAddress);
+ }
+ String[] hostPort = syncAddress.split(":");
+ return (SyncServer) RPC.waitForProxy(SyncServer.class,
+ SyncServer.versionID, new InetSocketAddress(hostPort[0],
+ Integer.valueOf(hostPort[1])), conf);
+
+ }
+
+ @Override
+ public void enterBarrier(TaskAttemptID id) {
+ LOG.info("Task: " + id.getId() + " entered Barrier!");
+ if (partySet.contains(id.getId())) {
+ try {
+ barrier.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LOG.warn("TaskID " + id + " is no verified task!");
+ }
+ }
+
+ @Override
+ public void leaveBarrier(TaskAttemptID id) {
+ LOG.info("Task: " + id.getId() + " leaves Barrier!");
+ if (partySet.contains(id.getId())) {
+ try {
+ leaveBarrier.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LOG.warn("TaskID " + id + " is no verified task!");
+ }
+ }
+
+ @Override
+ public synchronized void register(TaskAttemptID id, Text hostAddress,
+ LongWritable port) {
+ partySet.add(id.getId());
+ String peer = hostAddress.toString() + ":" + port.get();
+ peerAddresses.add(peer);
+ LOG.info("Registered: " + id.getId() + " for peer " + peer);
+ if (partySet.size() > parties) {
+ LOG.warn("Registered more tasks than configured!");
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return clientVersion;
+ }
+
+ private static class SuperStepIncrementor implements Runnable {
+
+ private final SyncServerImpl instance;
+
+ public SuperStepIncrementor(SyncServerImpl syncServer) {
+ this.instance = syncServer;
+ }
+
+ @Override
+ public void run() {
+ synchronized (instance) {
+ this.instance.superstep += 1L;
+ LOG.info("Entering superstep: " + this.instance.superstep);
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException {
+ LOG.info(Arrays.toString(args));
+ if (args.length == 3) {
+ SyncServerImpl syncServer = new SyncServerImpl(
+ Integer.valueOf(args[0]), args[1], Integer.valueOf(args[2]));
+ syncServer.start();
+ syncServer.join();
+ } else {
+ throw new IllegalArgumentException(
+ "Argument count does not match 3! Given size was "
+ + args.length + " and parameters were "
+ + Arrays.toString(args));
+ }
+ }
+
+ @Override
+ public synchronized LongWritable getSuperStep() {
+ return new LongWritable(superstep);
+ }
+
+ @Override
+ public synchronized String[] getAllPeerNames() {
+ return peerAddresses.toArray(new String[peerAddresses.size()]);
+ }
+
+ @Override
+ public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
+ LongWritable port) {
+ // TODO Auto-generated method stub
+ // basically has to recreate the barriers and remove from the two basic
+ // sets.
+ }
+
+}
\ No newline at end of file
Propchange: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native