You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/14 15:29:12 UTC
svn commit: r1183352 [2/2] - in /incubator/hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/checkpoint/
core/src/test/java/org/apache/hama/bsp/
core/src/test/java/org/apache/hama/checkpoint/ yarn/ yarn/src/ yarn...
Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.bsp.sync.SyncServerImpl;
+import org.apache.hama.checkpoint.CheckpointRunner;
+
+/**
+ * This class represents a BSP peer.
+ */
+public class YARNBSPPeerImpl implements BSPPeer {
+
+ public static final Log LOG = LogFactory.getLog(YARNBSPPeerImpl.class);
+
+ private final Configuration conf;
+
+ private volatile Server server = null;
+
+ private final Map<InetSocketAddress, BSPPeer> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeer>();
+ private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+ private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+ private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+ private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>();
+
+ private InetSocketAddress peerAddress;
+ private TaskStatus currentTaskStatus;
+
+ private TaskAttemptID taskid;
+ private SyncServer syncService;
+ private final BSPMessageSerializer messageSerializer;
+
+ public static final class BSPSerializableMessage implements Writable {
+ final AtomicReference<String> path = new AtomicReference<String>();
+ final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
+
+ public BSPSerializableMessage() {
+ }
+
+ public BSPSerializableMessage(final String path,
+ final BSPMessageBundle bundle) {
+ if (null == path)
+ throw new NullPointerException("No path provided for checkpointing.");
+ if (null == bundle)
+ throw new NullPointerException("No data provided for checkpointing.");
+ this.path.set(path);
+ this.bundle.set(bundle);
+ }
+
+ public final String checkpointedPath() {
+ return this.path.get();
+ }
+
+ public final BSPMessageBundle messageBundle() {
+ return this.bundle.get();
+ }
+
+ @Override
+ public final void write(DataOutput out) throws IOException {
+ out.writeUTF(this.path.get());
+ this.bundle.get().write(out);
+ }
+
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ this.path.set(in.readUTF());
+ BSPMessageBundle pack = new BSPMessageBundle();
+ pack.readFields(in);
+ this.bundle.set(pack);
+ }
+
+ }// serializable message
+
+ final class BSPMessageSerializer {
+ final Socket client;
+ final ScheduledExecutorService sched;
+
+ public BSPMessageSerializer(final int port) {
+ Socket tmp = null;
+ int cnt = 0;
+ do {
+ tmp = init(port);
+ cnt++;
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread is interrupted.", ie);
+ Thread.currentThread().interrupt();
+ }
+ } while (null == tmp && 10 > cnt);
+ this.client = tmp;
+ if (null == this.client)
+ throw new NullPointerException("Client socket is null.");
+ this.sched = Executors.newScheduledThreadPool(conf.getInt(
+ "bsp.checkpoint.serializer_thread", 10));
+ LOG.info(BSPMessageSerializer.class.getName()
+ + " is ready to serialize message.");
+ }
+
+ private Socket init(final int port) {
+ Socket tmp = null;
+ try {
+ tmp = new Socket("localhost", port);
+ } catch (UnknownHostException uhe) {
+ LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
+ } catch (IOException ioe) {
+ LOG.warn("Fail to create socket.", ioe);
+ }
+ return tmp;
+ }
+
+ void serialize(final BSPSerializableMessage tmp) throws IOException {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Messages are saved to " + tmp.checkpointedPath());
+ final DataOutput out = new DataOutputStream(client.getOutputStream());
+ this.sched.schedule(new Callable<Object>() {
+ public Object call() throws Exception {
+ tmp.write(out);
+ return null;
+ }
+ }, 0, SECONDS);
+ }
+
+ public void close() {
+ try {
+ this.client.close();
+ this.sched.shutdown();
+ } catch (IOException io) {
+ LOG.error("Fail to close client socket.", io);
+ }
+ }
+
+ }// message serializer
+
+ /**
+ * BSPPeer Constructor.
+ *
+ * BSPPeer acts on behalf of clients performing bsp() tasks.
+ *
+ * @param conf is the configuration file containing bsp peer host, port, etc.
+ * @param taskid is the id that current process holds.
+ */
+ public YARNBSPPeerImpl(Configuration conf, TaskAttemptID taskid)
+ throws IOException {
+ this.conf = conf;
+ this.taskid = taskid;
+
+ String bindAddress = conf.get(Constants.PEER_HOST,
+ Constants.DEFAULT_PEER_HOST);
+ int bindPort = conf
+ .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+ peerAddress = new InetSocketAddress(bindAddress, bindPort);
+ BSPMessageSerializer msgSerializer = null;
+ if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
+ msgSerializer = new BSPMessageSerializer(conf.getInt(
+ "bsp.checkpoint.port",
+ Integer.parseInt(CheckpointRunner.DEFAULT_PORT)));
+ }
+ this.messageSerializer = msgSerializer;
+
+ syncService = SyncServerImpl.getService(conf);
+ syncService.register(taskid, new Text(peerAddress.getHostName()),
+ new LongWritable(peerAddress.getPort()));
+ currentTaskStatus = new TaskStatus();
+ }
+
+ public void reinitialize() {
+ try {
+ if (LOG.isDebugEnabled())
+ LOG.debug("reinitialize(): " + getPeerName());
+ this.server = RPC.getServer(this, peerAddress.getHostName(),
+ peerAddress.getPort(), conf);
+ server.start();
+ LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
+ + peerAddress.getPort());
+ syncService = SyncServerImpl.getService(conf);
+ syncService.register(taskid, new Text(peerAddress.getHostName()),
+ new LongWritable(peerAddress.getPort()));
+ } catch (IOException e) {
+ LOG.error("Fail to start RPC server!", e);
+ }
+ }
+
+ @Override
+ public BSPMessage getCurrentMessage() throws IOException {
+ return localQueue.poll();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress,
+ * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
+ */
+ @Override
+ public void send(String peerName, BSPMessage msg) throws IOException {
+ if (peerName.equals(getPeerName())) {
+ LOG.debug("Local send bytes (" + msg.getData().toString() + ")");
+ localQueueForNextIteration.add(msg);
+ } else {
+ LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
+ InetSocketAddress targetPeerAddress = null;
+ // Get socket for target peer.
+ if (peerSocketCache.containsKey(peerName)) {
+ targetPeerAddress = peerSocketCache.get(peerName);
+ } else {
+ targetPeerAddress = getAddress(peerName);
+ peerSocketCache.put(peerName, targetPeerAddress);
+ }
+ ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
+ .get(targetPeerAddress);
+ if (queue == null) {
+ queue = new ConcurrentLinkedQueue<BSPMessage>();
+ }
+ queue.add(msg);
+ outgoingQueues.put(targetPeerAddress, queue);
+ }
+ }
+
+ // TODO not working properly!
+ private String checkpointedPath() {
+ String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+ // String ckptPath = backup + jobConf.getJobID().toString() + "/"
+ // + getSuperstepCount() + "/" + this.taskid.toString();
+ // if (LOG.isDebugEnabled())
+ // LOG.debug("Messages are to be saved to " + ckptPath);
+ return backup;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.BSPPeerInterface#sync()
+ */
+ @Override
+ public void sync() throws IOException, InterruptedException {
+ enterBarrier();
+ Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
+ .entrySet().iterator();
+
+ while (it.hasNext()) {
+ Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
+ .next();
+
+ BSPPeer peer = peers.get(entry.getKey());
+ if (peer == null) {
+ try {
+ peer = getBSPPeerConnection(entry.getKey());
+ } catch (NullPointerException ne) {
+ LOG.error(taskid + ": " + entry.getKey().getHostName()
+ + " doesn't exists.");
+ }
+ }
+ Iterable<BSPMessage> messages = entry.getValue();
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ for (BSPMessage message : messages) {
+ bundle.addMessage(message);
+ }
+
+ // checkpointing
+ if (null != this.messageSerializer) {
+ this.messageSerializer.serialize(new BSPSerializableMessage(
+ checkpointedPath(), bundle));
+ }
+
+ peer.put(bundle);
+ }
+
+ leaveBarrier();
+ currentTaskStatus.incrementSuperstepCount();
+
+ // Clear outgoing queues.
+ clearOutgoingQueues();
+
+ // Add non-processed messages from this iteration for the next's queue.
+ while (!localQueue.isEmpty()) {
+ BSPMessage message = localQueue.poll();
+ localQueueForNextIteration.add(message);
+ }
+ // Switch local queues.
+ localQueue = localQueueForNextIteration;
+ localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+ }
+
+ protected boolean enterBarrier() throws InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
+ + this.getSuperstepCount());
+ }
+
+ syncService.enterBarrier(taskid);
+ return true;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ protected boolean leaveBarrier() throws InterruptedException {
+ syncService.leaveBarrier(taskid);
+ return true;
+ }
+
+ public void clear() {
+ this.localQueue.clear();
+ this.outgoingQueues.clear();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.clear();
+ syncService.deregisterFromBarrier(taskid,
+ new Text(this.peerAddress.getHostName()), new LongWritable(
+ this.peerAddress.getPort()));
+ if (server != null)
+ server.stop();
+ if (null != messageSerializer)
+ this.messageSerializer.close();
+ }
+
+ @Override
+ public void put(BSPMessage msg) throws IOException {
+ this.localQueueForNextIteration.add(msg);
+ }
+
+ @Override
+ public void put(BSPMessageBundle messages) throws IOException {
+ for (BSPMessage message : messages.getMessages()) {
+ this.localQueueForNextIteration.add(message);
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return BSPPeer.versionID;
+ }
+
+ protected BSPPeer getBSPPeerConnection(InetSocketAddress addr)
+ throws NullPointerException {
+ BSPPeer peer;
+ synchronized (this.peers) {
+ peer = peers.get(addr);
+
+ if (peer == null) {
+ try {
+ peer = (BSPPeer) RPC.getProxy(BSPPeer.class, BSPPeer.versionID, addr,
+ this.conf);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ this.peers.put(addr, peer);
+ }
+ }
+
+ return peer;
+ }
+
+ /**
+ * @return the string as host:port of this Peer
+ */
+ public String getPeerName() {
+ return peerAddress.getHostName() + ":" + peerAddress.getPort();
+ }
+
+ private InetSocketAddress getAddress(String peerName) {
+ String[] peerAddrParts = peerName.split(":");
+ if (peerAddrParts.length != 2) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Peername must consist of exactly ONE \":\"! Given peername was: "
+ + peerName);
+ }
+ return new InetSocketAddress(peerAddrParts[0],
+ Integer.parseInt(peerAddrParts[1]));
+ }
+
+ @Override
+ public String[] getAllPeerNames() {
+ return syncService.getAllPeerNames().get();
+ }
+
+ /**
+ * @return the number of messages
+ */
+ public int getNumCurrentMessages() {
+ return localQueue.size();
+ }
+
+ /**
+ * Sets the current status
+ *
+ * @param currentTaskStatus
+ */
+ public void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
+ this.currentTaskStatus = currentTaskStatus;
+ }
+
+ /**
+ * @return the count of current super-step
+ */
+ public long getSuperstepCount() {
+ return currentTaskStatus.getSuperstepCount();
+ }
+
+ /**
+ * @return the size of local queue
+ */
+ public int getLocalQueueSize() {
+ return localQueue.size();
+ }
+
+ /**
+ * @return the sync service
+ */
+ public SyncServer getSyncService() {
+ return syncService;
+ }
+
+ /**
+ * @return the size of outgoing queue
+ */
+ public int getOutgoingQueueSize() {
+ return outgoingQueues.size();
+ }
+
+ /**
+ * Clears local queue
+ */
+ public void clearLocalQueue() {
+ this.localQueue.clear();
+ }
+
+ /**
+ * Clears outgoing queues
+ */
+ public void clearOutgoingQueues() {
+ this.outgoingQueues.clear();
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ // TODO Auto-generated method stub
+ return new ProtocolSignature();
+ }
+}
Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
+import org.apache.zookeeper.KeeperException;
+
+public class YarnSerializePrinting {
+
+ public static class HelloBSP extends BSP {
+ public static final Log LOG = LogFactory.getLog(HelloBSP.class);
+ private Configuration conf;
+ private final static int PRINT_INTERVAL = 1000;
+ private int num;
+
+ @Override
+ public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+ InterruptedException {
+ num = conf.getInt("bsp.peers.num", 0);
+ LOG.info(bspPeer.getAllPeerNames());
+ int i = 0;
+ for (String otherPeer : bspPeer.getAllPeerNames()) {
+ String peerName = bspPeer.getPeerName();
+ if (peerName.equals(otherPeer)) {
+ LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": " + peerName);
+ }
+
+ Thread.sleep(PRINT_INTERVAL);
+ bspPeer.sync();
+ i++;
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ HamaConfiguration conf = new HamaConfiguration();
+ // TODO some keys that should be within a conf
+ conf.set("yarn.resourcemanager.address", "0.0.0.0:8040");
+ conf.set("bsp.local.dir", "/tmp/bsp-yarn/");
+
+ YARNBSPJob job = new YARNBSPJob(conf);
+ job.setBspClass(HelloBSP.class);
+ job.setJarByClass(HelloBSP.class);
+ job.setJobName("Serialize Printing");
+ job.setMemoryUsedPerTaskInMb(50);
+ job.setNumBspTask(2);
+ // TODO waitForCompletion(true) throws exceptions
+ job.waitForCompletion(false);
+ }
+}
Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Custom writable for string arrays, because ArrayWritable has no default
+ * constructor and is broken.
+ *
+ */
+public class StringArrayWritable implements Writable {
+
+ private String[] array;
+
+ public StringArrayWritable() {
+ super();
+ }
+
+ public StringArrayWritable(String[] array) {
+ super();
+ this.array = array;
+ }
+
+ // no defensive copy needed because this always comes from an rpc call.
+ public String[] get() {
+ return array;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(array.length);
+ for (String s : array) {
+ out.writeUTF(s);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ array = new String[in.readInt()];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = in.readUTF();
+ }
+ }
+
+}
Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Hadoop RPC based barrier synchronization service.
+ *
+ */
+public interface SyncServer extends VersionedProtocol {
+
+ public static final long versionID = 0L;
+
+ public void enterBarrier(TaskAttemptID id);
+
+ public void leaveBarrier(TaskAttemptID id);
+
+ public void register(TaskAttemptID id, Text hostAddress, LongWritable port);
+
+ public LongWritable getSuperStep();
+
+ public StringArrayWritable getAllPeerNames();
+
+ public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
+ LongWritable port);
+
+ public void stopServer();
+
+}
Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Synchronization Deamon. <br\>
+ */
+public class SyncServerImpl implements SyncServer, Callable<Long> {
+
+ private static final Log LOG = LogFactory.getLog(SyncServerImpl.class);
+
+ private Configuration conf = new Configuration();
+ private Server server;
+
+ private int parties;
+
+ private CyclicBarrier barrier;
+ private CyclicBarrier leaveBarrier;
+ private Set<Integer> partySet;
+ private Set<String> peerAddresses;
+
+ private volatile long superstep = 0L;
+
+ public SyncServerImpl(int parties, String host, int port) throws IOException {
+ this.parties = parties;
+ this.barrier = new CyclicBarrier(parties);
+ this.leaveBarrier = new CyclicBarrier(parties, new SuperStepIncrementor(
+ this));
+
+ this.partySet = Collections.synchronizedSet(new HashSet<Integer>(parties));
+ // tree set so there is ascending order for consistent returns in
+ // getAllPeerNames()
+ this.peerAddresses = Collections.synchronizedSet(new TreeSet<String>());
+ // allocate ten more rpc handler than parties for additional services to
+ // plug in or to deal with failure.
+ this.server = RPC.getServer(this, host, port, parties + 10, false, conf);
+ LOG.info("Sync Server is now up at: " + host + ":" + port + "!");
+ }
+
+ public void start() throws IOException {
+ server.start();
+ }
+
+ @Override
+ public void stopServer() {
+ server.stop();
+ }
+
+ public void join() throws InterruptedException {
+ server.join();
+ }
+
+ public static SyncServer getService(Configuration conf)
+ throws NumberFormatException, IOException {
+ String syncAddress = conf.get("hama.sync.server.address");
+ if (syncAddress == null || syncAddress.isEmpty()
+ || !syncAddress.contains(":")) {
+ throw new IllegalArgumentException(
+ "Server sync address must contain a colon and must be non-empty and not-null! Property \"hama.sync.server.address\" was: "
+ + syncAddress);
+ }
+ String[] hostPort = syncAddress.split(":");
+ return (SyncServer) RPC.waitForProxy(SyncServer.class,
+ SyncServer.versionID,
+ new InetSocketAddress(hostPort[0], Integer.valueOf(hostPort[1])), conf);
+
+ }
+
+ @Override
+ public void enterBarrier(TaskAttemptID id) {
+ LOG.info("Task: " + id.getId() + " entered Barrier!");
+ if (partySet.contains(id.getId())) {
+ try {
+ barrier.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LOG.warn("TaskID " + id + " is no verified task!");
+ }
+ }
+
+ @Override
+ public void leaveBarrier(TaskAttemptID id) {
+ LOG.info("Task: " + id.getId() + " leaves Barrier!");
+ if (partySet.contains(id.getId())) {
+ try {
+ leaveBarrier.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LOG.warn("TaskID " + id + " is no verified task!");
+ }
+ }
+
+ @Override
+ public synchronized void register(TaskAttemptID id, Text hostAddress,
+ LongWritable port) {
+ partySet.add(id.getId());
+ String peer = hostAddress.toString() + ":" + port.get();
+ peerAddresses.add(peer);
+ LOG.info("Registered: " + id.getId() + " for peer " + peer);
+ if (partySet.size() > parties) {
+ LOG.warn("Registered more tasks than configured!");
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return clientVersion;
+ }
+
+ private static class SuperStepIncrementor implements Runnable {
+
+ private final SyncServerImpl instance;
+
+ public SuperStepIncrementor(SyncServerImpl syncServer) {
+ this.instance = syncServer;
+ }
+
+ @Override
+ public void run() {
+ synchronized (instance) {
+ this.instance.superstep += 1L;
+ LOG.info("Entering superstep: " + this.instance.superstep);
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException {
+ LOG.info(Arrays.toString(args));
+ if (args.length == 3) {
+ SyncServerImpl syncServer = new SyncServerImpl(Integer.valueOf(args[0]),
+ args[1], Integer.valueOf(args[2]));
+ syncServer.start();
+ syncServer.join();
+ } else {
+ throw new IllegalArgumentException(
+ "Argument count does not match 3! Given size was " + args.length
+ + " and parameters were " + Arrays.toString(args));
+ }
+ }
+
+ @Override
+ public Long call() throws Exception {
+ this.start();
+ this.join();
+ return this.superstep;
+ }
+
+ @Override
+ public synchronized LongWritable getSuperStep() {
+ return new LongWritable(superstep);
+ }
+
+ @Override
+ public synchronized StringArrayWritable getAllPeerNames() {
+ return new StringArrayWritable(
+ peerAddresses.toArray(new String[peerAddresses.size()]));
+ }
+
+ @Override
+ public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
+ LongWritable port) {
+ // TODO Auto-generated method stub
+ // basically has to recreate the barriers and remove from the two basic
+ // sets.
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ // TODO Auto-generated method stub
+ return new ProtocolSignature();
+ }
+
+}
Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/trunk/yarn/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/resources/log4j.properties?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/resources/log4j.properties (added)
+++ incubator/hama/trunk/yarn/src/main/resources/log4j.properties Fri Oct 14 13:29:10 2011
@@ -0,0 +1,243 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG
Propchange: incubator/hama/trunk/yarn/src/main/resources/log4j.properties
------------------------------------------------------------------------------
svn:eol-style = native