You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/14 15:29:12 UTC

svn commit: r1183352 [2/2] - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/checkpoint/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/checkpoint/ yarn/ yarn/src/ yarn...

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.bsp.sync.SyncServerImpl;
+import org.apache.hama.checkpoint.CheckpointRunner;
+
+/**
+ * This class represents a BSP peer.
+ */
+public class YARNBSPPeerImpl implements BSPPeer {
+
+  public static final Log LOG = LogFactory.getLog(YARNBSPPeerImpl.class);
+
+  private final Configuration conf;
+
+  private volatile Server server = null;
+
+  private final Map<InetSocketAddress, BSPPeer> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeer>();
+  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+  private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>();
+
+  private InetSocketAddress peerAddress;
+  private TaskStatus currentTaskStatus;
+
+  private TaskAttemptID taskid;
+  private SyncServer syncService;
+  private final BSPMessageSerializer messageSerializer;
+
+  public static final class BSPSerializableMessage implements Writable {
+    final AtomicReference<String> path = new AtomicReference<String>();
+    final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
+
+    public BSPSerializableMessage() {
+    }
+
+    public BSPSerializableMessage(final String path,
+        final BSPMessageBundle bundle) {
+      if (null == path)
+        throw new NullPointerException("No path provided for checkpointing.");
+      if (null == bundle)
+        throw new NullPointerException("No data provided for checkpointing.");
+      this.path.set(path);
+      this.bundle.set(bundle);
+    }
+
+    public final String checkpointedPath() {
+      return this.path.get();
+    }
+
+    public final BSPMessageBundle messageBundle() {
+      return this.bundle.get();
+    }
+
+    @Override
+    public final void write(DataOutput out) throws IOException {
+      out.writeUTF(this.path.get());
+      this.bundle.get().write(out);
+    }
+
+    @Override
+    public final void readFields(DataInput in) throws IOException {
+      this.path.set(in.readUTF());
+      BSPMessageBundle pack = new BSPMessageBundle();
+      pack.readFields(in);
+      this.bundle.set(pack);
+    }
+
+  }// serializable message
+
+  final class BSPMessageSerializer {
+    final Socket client;
+    final ScheduledExecutorService sched;
+
+    public BSPMessageSerializer(final int port) {
+      Socket tmp = null;
+      int cnt = 0;
+      do {
+        tmp = init(port);
+        cnt++;
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          LOG.warn("Thread is interrupted.", ie);
+          Thread.currentThread().interrupt();
+        }
+      } while (null == tmp && 10 > cnt);
+      this.client = tmp;
+      if (null == this.client)
+        throw new NullPointerException("Client socket is null.");
+      this.sched = Executors.newScheduledThreadPool(conf.getInt(
+          "bsp.checkpoint.serializer_thread", 10));
+      LOG.info(BSPMessageSerializer.class.getName()
+          + " is ready to serialize message.");
+    }
+
+    private Socket init(final int port) {
+      Socket tmp = null;
+      try {
+        tmp = new Socket("localhost", port);
+      } catch (UnknownHostException uhe) {
+        LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
+      } catch (IOException ioe) {
+        LOG.warn("Fail to create socket.", ioe);
+      }
+      return tmp;
+    }
+
+    void serialize(final BSPSerializableMessage tmp) throws IOException {
+      if (LOG.isDebugEnabled())
+        LOG.debug("Messages are saved to " + tmp.checkpointedPath());
+      final DataOutput out = new DataOutputStream(client.getOutputStream());
+      this.sched.schedule(new Callable<Object>() {
+        public Object call() throws Exception {
+          tmp.write(out);
+          return null;
+        }
+      }, 0, SECONDS);
+    }
+
+    public void close() {
+      try {
+        this.client.close();
+        this.sched.shutdown();
+      } catch (IOException io) {
+        LOG.error("Fail to close client socket.", io);
+      }
+    }
+
+  }// message serializer
+
+  /**
+   * BSPPeer Constructor.
+   * 
+   * BSPPeer acts on behalf of clients performing bsp() tasks.
+   * 
+   * @param conf is the configuration file containing bsp peer host, port, etc.
+   * @param taskid is the id that current process holds.
+   */
+  public YARNBSPPeerImpl(Configuration conf, TaskAttemptID taskid)
+      throws IOException {
+    this.conf = conf;
+    this.taskid = taskid;
+
+    String bindAddress = conf.get(Constants.PEER_HOST,
+        Constants.DEFAULT_PEER_HOST);
+    int bindPort = conf
+        .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+    peerAddress = new InetSocketAddress(bindAddress, bindPort);
+    BSPMessageSerializer msgSerializer = null;
+    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
+      msgSerializer = new BSPMessageSerializer(conf.getInt(
+          "bsp.checkpoint.port",
+          Integer.parseInt(CheckpointRunner.DEFAULT_PORT)));
+    }
+    this.messageSerializer = msgSerializer;
+
+    syncService = SyncServerImpl.getService(conf);
+    syncService.register(taskid, new Text(peerAddress.getHostName()),
+        new LongWritable(peerAddress.getPort()));
+    currentTaskStatus = new TaskStatus();
+  }
+
+  public void reinitialize() {
+    try {
+      if (LOG.isDebugEnabled())
+        LOG.debug("reinitialize(): " + getPeerName());
+      this.server = RPC.getServer(this, peerAddress.getHostName(),
+          peerAddress.getPort(), conf);
+      server.start();
+      LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
+          + peerAddress.getPort());
+      syncService = SyncServerImpl.getService(conf);
+      syncService.register(taskid, new Text(peerAddress.getHostName()),
+          new LongWritable(peerAddress.getPort()));
+    } catch (IOException e) {
+      LOG.error("Fail to start RPC server!", e);
+    }
+  }
+
+  @Override
+  public BSPMessage getCurrentMessage() throws IOException {
+    return localQueue.poll();
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress,
+   * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
+   */
+  @Override
+  public void send(String peerName, BSPMessage msg) throws IOException {
+    if (peerName.equals(getPeerName())) {
+      LOG.debug("Local send bytes (" + msg.getData().toString() + ")");
+      localQueueForNextIteration.add(msg);
+    } else {
+      LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
+      InetSocketAddress targetPeerAddress = null;
+      // Get socket for target peer.
+      if (peerSocketCache.containsKey(peerName)) {
+        targetPeerAddress = peerSocketCache.get(peerName);
+      } else {
+        targetPeerAddress = getAddress(peerName);
+        peerSocketCache.put(peerName, targetPeerAddress);
+      }
+      ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
+          .get(targetPeerAddress);
+      if (queue == null) {
+        queue = new ConcurrentLinkedQueue<BSPMessage>();
+      }
+      queue.add(msg);
+      outgoingQueues.put(targetPeerAddress, queue);
+    }
+  }
+
+  // TODO not working properly!
+  private String checkpointedPath() {
+    String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+    // String ckptPath = backup + jobConf.getJobID().toString() + "/"
+    // + getSuperstepCount() + "/" + this.taskid.toString();
+    // if (LOG.isDebugEnabled())
+    // LOG.debug("Messages are to be saved to " + ckptPath);
+    return backup;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.BSPPeerInterface#sync()
+   */
+  @Override
+  public void sync() throws IOException, InterruptedException {
+    enterBarrier();
+    Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
+        .entrySet().iterator();
+
+    while (it.hasNext()) {
+      Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
+          .next();
+
+      BSPPeer peer = peers.get(entry.getKey());
+      if (peer == null) {
+        try {
+          peer = getBSPPeerConnection(entry.getKey());
+        } catch (NullPointerException ne) {
+          LOG.error(taskid + ": " + entry.getKey().getHostName()
+              + " doesn't exists.");
+        }
+      }
+      Iterable<BSPMessage> messages = entry.getValue();
+      BSPMessageBundle bundle = new BSPMessageBundle();
+      for (BSPMessage message : messages) {
+        bundle.addMessage(message);
+      }
+
+      // checkpointing
+      if (null != this.messageSerializer) {
+        this.messageSerializer.serialize(new BSPSerializableMessage(
+            checkpointedPath(), bundle));
+      }
+
+      peer.put(bundle);
+    }
+
+    leaveBarrier();
+    currentTaskStatus.incrementSuperstepCount();
+
+    // Clear outgoing queues.
+    clearOutgoingQueues();
+
+    // Add non-processed messages from this iteration for the next's queue.
+    while (!localQueue.isEmpty()) {
+      BSPMessage message = localQueue.poll();
+      localQueueForNextIteration.add(message);
+    }
+    // Switch local queues.
+    localQueue = localQueueForNextIteration;
+    localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+  }
+
+  protected boolean enterBarrier() throws InterruptedException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
+          + this.getSuperstepCount());
+    }
+
+    syncService.enterBarrier(taskid);
+    return true;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  protected boolean leaveBarrier() throws InterruptedException {
+    syncService.leaveBarrier(taskid);
+    return true;
+  }
+
+  public void clear() {
+    this.localQueue.clear();
+    this.outgoingQueues.clear();
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.clear();
+    syncService.deregisterFromBarrier(taskid,
+        new Text(this.peerAddress.getHostName()), new LongWritable(
+            this.peerAddress.getPort()));
+    if (server != null)
+      server.stop();
+    if (null != messageSerializer)
+      this.messageSerializer.close();
+  }
+
+  @Override
+  public void put(BSPMessage msg) throws IOException {
+    this.localQueueForNextIteration.add(msg);
+  }
+
+  @Override
+  public void put(BSPMessageBundle messages) throws IOException {
+    for (BSPMessage message : messages.getMessages()) {
+      this.localQueueForNextIteration.add(message);
+    }
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return BSPPeer.versionID;
+  }
+
+  protected BSPPeer getBSPPeerConnection(InetSocketAddress addr)
+      throws NullPointerException {
+    BSPPeer peer;
+    synchronized (this.peers) {
+      peer = peers.get(addr);
+
+      if (peer == null) {
+        try {
+          peer = (BSPPeer) RPC.getProxy(BSPPeer.class, BSPPeer.versionID, addr,
+              this.conf);
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+        this.peers.put(addr, peer);
+      }
+    }
+
+    return peer;
+  }
+
+  /**
+   * @return the string as host:port of this Peer
+   */
+  public String getPeerName() {
+    return peerAddress.getHostName() + ":" + peerAddress.getPort();
+  }
+
+  private InetSocketAddress getAddress(String peerName) {
+    String[] peerAddrParts = peerName.split(":");
+    if (peerAddrParts.length != 2) {
+      throw new ArrayIndexOutOfBoundsException(
+          "Peername must consist of exactly ONE \":\"! Given peername was: "
+              + peerName);
+    }
+    return new InetSocketAddress(peerAddrParts[0],
+        Integer.parseInt(peerAddrParts[1]));
+  }
+
+  @Override
+  public String[] getAllPeerNames() {
+    return syncService.getAllPeerNames().get();
+  }
+
+  /**
+   * @return the number of messages
+   */
+  public int getNumCurrentMessages() {
+    return localQueue.size();
+  }
+
+  /**
+   * Sets the current status
+   * 
+   * @param currentTaskStatus
+   */
+  public void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
+    this.currentTaskStatus = currentTaskStatus;
+  }
+
+  /**
+   * @return the count of current super-step
+   */
+  public long getSuperstepCount() {
+    return currentTaskStatus.getSuperstepCount();
+  }
+
+  /**
+   * @return the size of local queue
+   */
+  public int getLocalQueueSize() {
+    return localQueue.size();
+  }
+
+  /**
+   * @return the sync service
+   */
+  public SyncServer getSyncService() {
+    return syncService;
+  }
+
+  /**
+   * @return the size of outgoing queue
+   */
+  public int getOutgoingQueueSize() {
+    return outgoingQueues.size();
+  }
+
+  /**
+   * Clears local queue
+   */
+  public void clearLocalQueue() {
+    this.localQueue.clear();
+  }
+
+  /**
+   * Clears outgoing queues
+   */
+  public void clearOutgoingQueues() {
+    this.outgoingQueues.clear();
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    // TODO Auto-generated method stub
+    return new ProtocolSignature();
+  }
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
+import org.apache.zookeeper.KeeperException;
+
+public class YarnSerializePrinting {
+
+  public static class HelloBSP extends BSP {
+    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
+    private Configuration conf;
+    private final static int PRINT_INTERVAL = 1000;
+    private int num;
+
+    @Override
+    public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+        InterruptedException {
+      num = conf.getInt("bsp.peers.num", 0);
+      LOG.info(bspPeer.getAllPeerNames());
+      int i = 0;
+      for (String otherPeer : bspPeer.getAllPeerNames()) {
+        String peerName = bspPeer.getPeerName();
+        if (peerName.equals(otherPeer)) {
+          LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": " + peerName);
+        }
+
+        Thread.sleep(PRINT_INTERVAL);
+        bspPeer.sync();
+        i++;
+      }
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    HamaConfiguration conf = new HamaConfiguration();
+    // TODO some keys that should be within a conf
+    conf.set("yarn.resourcemanager.address", "0.0.0.0:8040");
+    conf.set("bsp.local.dir", "/tmp/bsp-yarn/");
+    
+    YARNBSPJob job = new YARNBSPJob(conf);
+    job.setBspClass(HelloBSP.class);
+    job.setJarByClass(HelloBSP.class);
+    job.setJobName("Serialize Printing");
+    job.setMemoryUsedPerTaskInMb(50);
+    job.setNumBspTask(2);
+    // TODO waitForCompletion(true) throws exceptions
+    job.waitForCompletion(false);
+  }
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Custom writable for string arrays, because ArrayWritable has no default
+ * constructor and is broken.
+ * 
+ */
+public class StringArrayWritable implements Writable {
+
+  private String[] array;
+
+  public StringArrayWritable() {
+    super();
+  }
+
+  public StringArrayWritable(String[] array) {
+    super();
+    this.array = array;
+  }
+
+  // no defensive copy needed because this always comes from an rpc call.
+  public String[] get() {
+    return array;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(array.length);
+    for (String s : array) {
+      out.writeUTF(s);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    array = new String[in.readInt()];
+    for (int i = 0; i < array.length; i++) {
+      array[i] = in.readUTF();
+    }
+  }
+
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Hadoop RPC based barrier synchronization service.
+ * 
+ */
+public interface SyncServer extends VersionedProtocol {
+
+  public static final long versionID = 0L;
+
+  public void enterBarrier(TaskAttemptID id);
+
+  public void leaveBarrier(TaskAttemptID id);
+
+  public void register(TaskAttemptID id, Text hostAddress, LongWritable port);
+
+  public LongWritable getSuperStep();
+
+  public StringArrayWritable getAllPeerNames();
+
+  public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
+      LongWritable port);
+
+  public void stopServer();
+
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Synchronization Deamon. <br\>
+ */
+public class SyncServerImpl implements SyncServer, Callable<Long> {
+
+  private static final Log LOG = LogFactory.getLog(SyncServerImpl.class);
+
+  private Configuration conf = new Configuration();
+  private Server server;
+
+  private int parties;
+
+  private CyclicBarrier barrier;
+  private CyclicBarrier leaveBarrier;
+  private Set<Integer> partySet;
+  private Set<String> peerAddresses;
+
+  private volatile long superstep = 0L;
+
+  public SyncServerImpl(int parties, String host, int port) throws IOException {
+    this.parties = parties;
+    this.barrier = new CyclicBarrier(parties);
+    this.leaveBarrier = new CyclicBarrier(parties, new SuperStepIncrementor(
+        this));
+
+    this.partySet = Collections.synchronizedSet(new HashSet<Integer>(parties));
+    // tree set so there is ascending order for consistent returns in
+    // getAllPeerNames()
+    this.peerAddresses = Collections.synchronizedSet(new TreeSet<String>());
+    // allocate ten more rpc handler than parties for additional services to
+    // plug in or to deal with failure.
+    this.server = RPC.getServer(this, host, port, parties + 10, false, conf);
+    LOG.info("Sync Server is now up at: " + host + ":" + port + "!");
+  }
+
+  public void start() throws IOException {
+    server.start();
+  }
+
+  @Override
+  public void stopServer() {
+    server.stop();
+  }
+
+  public void join() throws InterruptedException {
+    server.join();
+  }
+
+  public static SyncServer getService(Configuration conf)
+      throws NumberFormatException, IOException {
+    String syncAddress = conf.get("hama.sync.server.address");
+    if (syncAddress == null || syncAddress.isEmpty()
+        || !syncAddress.contains(":")) {
+      throw new IllegalArgumentException(
+          "Server sync address must contain a colon and must be non-empty and not-null! Property \"hama.sync.server.address\" was: "
+              + syncAddress);
+    }
+    String[] hostPort = syncAddress.split(":");
+    return (SyncServer) RPC.waitForProxy(SyncServer.class,
+        SyncServer.versionID,
+        new InetSocketAddress(hostPort[0], Integer.valueOf(hostPort[1])), conf);
+
+  }
+
+  @Override
+  public void enterBarrier(TaskAttemptID id) {
+    LOG.info("Task: " + id.getId() + " entered Barrier!");
+    if (partySet.contains(id.getId())) {
+      try {
+        barrier.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (BrokenBarrierException e) {
+        e.printStackTrace();
+      }
+    } else {
+      LOG.warn("TaskID " + id + " is no verified task!");
+    }
+  }
+
+  @Override
+  public void leaveBarrier(TaskAttemptID id) {
+    LOG.info("Task: " + id.getId() + " leaves Barrier!");
+    if (partySet.contains(id.getId())) {
+      try {
+        leaveBarrier.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (BrokenBarrierException e) {
+        e.printStackTrace();
+      }
+    } else {
+      LOG.warn("TaskID " + id + " is no verified task!");
+    }
+  }
+
+  @Override
+  public synchronized void register(TaskAttemptID id, Text hostAddress,
+      LongWritable port) {
+    partySet.add(id.getId());
+    String peer = hostAddress.toString() + ":" + port.get();
+    peerAddresses.add(peer);
+    LOG.info("Registered: " + id.getId() + " for peer " + peer);
+    if (partySet.size() > parties) {
+      LOG.warn("Registered more tasks than configured!");
+    }
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return clientVersion;
+  }
+
+  private static class SuperStepIncrementor implements Runnable {
+
+    private final SyncServerImpl instance;
+
+    public SuperStepIncrementor(SyncServerImpl syncServer) {
+      this.instance = syncServer;
+    }
+
+    @Override
+    public void run() {
+      synchronized (instance) {
+        this.instance.superstep += 1L;
+        LOG.info("Entering superstep: " + this.instance.superstep);
+      }
+    }
+
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException {
+    LOG.info(Arrays.toString(args));
+    if (args.length == 3) {
+      SyncServerImpl syncServer = new SyncServerImpl(Integer.valueOf(args[0]),
+          args[1], Integer.valueOf(args[2]));
+      syncServer.start();
+      syncServer.join();
+    } else {
+      throw new IllegalArgumentException(
+          "Argument count does not match 3! Given size was " + args.length
+              + " and parameters were " + Arrays.toString(args));
+    }
+  }
+
+  @Override
+  public Long call() throws Exception {
+    this.start();
+    this.join();
+    return this.superstep;
+  }
+
+  @Override
+  public synchronized LongWritable getSuperStep() {
+    return new LongWritable(superstep);
+  }
+
+  @Override
+  public synchronized StringArrayWritable getAllPeerNames() {
+    return new StringArrayWritable(
+        peerAddresses.toArray(new String[peerAddresses.size()]));
+  }
+
+  @Override
+  public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
+      LongWritable port) {
+    // TODO Auto-generated method stub
+    // basically has to recreate the barriers and remove from the two basic
+    // sets.
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    // TODO Auto-generated method stub
+    return new ProtocolSignature();
+  }
+
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/resources/log4j.properties?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/resources/log4j.properties (added)
+++ incubator/hama/trunk/yarn/src/main/resources/log4j.properties Fri Oct 14 13:29:10 2011
@@ -0,0 +1,243 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG

Propchange: incubator/hama/trunk/yarn/src/main/resources/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native