You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by hy...@apache.org on 2010/04/25 12:03:29 UTC
svn commit: r937774 - in /incubator/hama/trunk: bin/ conf/ src/java/
src/java/org/apache/hama/ src/java/org/apache/hama/bsp/
src/java/org/apache/hama/graph/ src/java/org/apache/hama/ipc/
Author: hyunsik
Date: Sun Apr 25 10:03:28 2010
New Revision: 937774
URL: http://svn.apache.org/viewvc?rev=937774&view=rev
Log:
HAMA-247: Discuss / Refactor HamaMaster and GroomServer
Added:
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (with props)
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (with props)
incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java (with props)
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java (with props)
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java (with props)
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java (with props)
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (with props)
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java (with props)
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java (with props)
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java (with props)
Removed:
incubator/hama/trunk/src/java/groomserver-default.xml
incubator/hama/trunk/src/java/org/apache/hama/HamaMaster.java
incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/graph/ID.java
incubator/hama/trunk/src/java/org/apache/hama/graph/JobClient.java
incubator/hama/trunk/src/java/org/apache/hama/graph/JobContext.java
incubator/hama/trunk/src/java/org/apache/hama/graph/JobID.java
incubator/hama/trunk/src/java/org/apache/hama/graph/JobStatus.java
incubator/hama/trunk/src/java/org/apache/hama/graph/TaskAttemptContext.java
incubator/hama/trunk/src/java/org/apache/hama/graph/TaskAttemptID.java
incubator/hama/trunk/src/java/org/apache/hama/graph/TaskID.java
Modified:
incubator/hama/trunk/bin/hama
incubator/hama/trunk/conf/hama-default.xml
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java
incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java
incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
Modified: incubator/hama/trunk/bin/hama
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/hama?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/bin/hama (original)
+++ incubator/hama/trunk/bin/hama Sun Apr 25 10:03:28 2010
@@ -162,10 +162,10 @@ unset IFS
# figure out which class to run
if [ "$COMMAND" = "bspmaster" ] ; then
- CLASS='org.apache.hama.HamaMaster'
+ CLASS='org.apache.hama.bsp.BSPMaster'
BSP_OPTS="$BSP_OPTS $BSP_BSPMASTER_OPTS"
elif [ "$COMMAND" = "groom" ] ; then
- CLASS='org.apache.hama.graph.GroomServer'
+ CLASS='org.apache.hama.bsp.GroomServer'
BSP_OPTS="$BSP_OPTS $BSP_GROOMSERVER_OPTS"
elif [ "$COMMAND" = "version" ] ; then
CLASS=org.apache.bsp.util.VersionInfo
Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Sun Apr 25 10:03:28 2010
@@ -31,7 +31,19 @@
<property>
<name>hama.groom.port</name>
<value>40020</value>
- <description>The port an groom server binds to.
+ <description>The port an groom server binds to.</description>
+ </property>
+
+ <property>
+ <name>bspd.system.dir</name>
+ <value>${hadoop.tmp.dir}/bsp/system</value>
+ <description>The shared directory where BSP stores control files.
</description>
</property>
+
+ <property>
+ <name>bspd.groom.local.dir</name>
+ <value>${hadoop.tmp.dir}/groomserver/local</value>
+ <description>local directory for temporal store</description>
+ </property>
</configuration>
\ No newline at end of file
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,231 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.HeartbeatResponse;
+import org.apache.hama.ipc.InterTrackerProtocol;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+
+/**
+ * BSPMaster is responsible to control all the bsp peers and to manage bsp jobs.
+ */
+public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol {
+ static{
+ Configuration.addDefaultResource("hama-default.xml");
+ }
+
+ public static final Log LOG = LogFactory.getLog(BSPMaster.class);
+
+ private HamaConfiguration conf;
+ public static enum State { INITIALIZING, RUNNING }
+ State state = State.INITIALIZING;
+
+ String masterIdentifier;
+
+ private Server interTrackerServer;
+
+ FileSystem fs = null;
+ Path systemDir = null;
+
+ // system directories are world-wide readable and owner readable
+ final static FsPermission SYSTEM_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+
+ // system files should have 700 permission
+ final static FsPermission SYSTEM_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx------
+
+ private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+
+ private int nextJobId = 1;
+
+ public BSPMaster(HamaConfiguration conf, String identifier) throws IOException, InterruptedException {
+ this.conf = conf;
+
+ this.masterIdentifier = identifier;
+
+ InetSocketAddress addr = getAddress(conf);
+ this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), conf);
+
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ if (fs == null) {
+ fs = FileSystem.get(conf);
+ }
+ // clean up the system dir, which will only work if hdfs is out of
+ // safe mode
+ if(systemDir == null) {
+ systemDir = new Path(getSystemDir());
+ }
+
+ LOG.info("Cleaning up the system directory");
+ fs.delete(systemDir, true);
+ if (FileSystem.mkdirs(fs, systemDir,
+ new FsPermission(SYSTEM_DIR_PERMISSION))) {
+ break;
+ }
+ LOG.error("Mkdirs failed to create " + systemDir);
+
+ } catch (AccessControlException ace) {
+ LOG.warn("Failed to operate on bspd.system.dir (" + systemDir
+ + ") because of permissions.");
+ LOG.warn("Manually delete the bspd.system.dir (" + systemDir
+ + ") and then start the JobTracker.");
+ LOG.warn("Bailing out ... ");
+ throw ace;
+ } catch (IOException ie) {
+ LOG.info("problem cleaning system directory: " + systemDir, ie);
+ }
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+ }
+
+ // deleteLocalFiles(SUBDIR);
+ }
+
+ public static BSPMaster startMaster(HamaConfiguration conf) throws IOException,
+ InterruptedException {
+ return startTracker(conf, generateNewIdentifier());
+ }
+
+ public static BSPMaster startTracker(HamaConfiguration conf, String identifier)
+ throws IOException, InterruptedException {
+
+ BSPMaster result = null;
+ result = new BSPMaster(conf, identifier);
+
+ return result;
+ }
+
+ public static InetSocketAddress getAddress(Configuration conf) {
+ String hamaMasterStr = conf.get("hama.master.address", "localhost:40000");
+ return NetUtils.createSocketAddr(hamaMasterStr);
+ }
+
+ public int getPort() {
+ return this.conf.getInt("hama.master.port", 0);
+ }
+
+ public HamaConfiguration getConfiguration() {
+ return this.conf;
+ }
+
+ private static SimpleDateFormat getDateFormat() {
+ return new SimpleDateFormat("yyyyMMddHHmm");
+ }
+
+ /**
+ *
+ * @return
+ */
+ private static String generateNewIdentifier() {
+ return getDateFormat().format(new Date());
+ }
+
+ public void offerService() throws InterruptedException, IOException {
+ this.interTrackerServer.start();
+
+ synchronized (this) {
+ state = State.RUNNING;
+ }
+ LOG.info("Starting RUNNING");
+
+ this.interTrackerServer.join();
+ LOG.info("Stopped interTrackerServer");
+ }
+
+
+ public static void main(String [] args) {
+ StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
+ if (args.length != 0) {
+ System.out.println("usage: HamaMaster");
+ System.exit(-1);
+ }
+
+ try {
+ HamaConfiguration conf = new HamaConfiguration();
+ BSPMaster master = startMaster(conf);
+ master.offerService();
+ } catch (Throwable e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ if (protocol.equals(InterTrackerProtocol.class.getName())) {
+ return InterTrackerProtocol.versionID;
+ } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
+ return JobSubmissionProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol to job tracker: " + protocol);
+ }
+ }
+
+ /**
+ * A RPC method for transmitting each peer status from peer to master.
+ */
+ @Override
+ public HeartbeatResponse heartbeat(short responseId) {
+ LOG.debug(">>> return the heartbeat message.");
+ return new HeartbeatResponse((short)1);
+ }
+
+ /**
+ * Return system directory to which BSP store control files.
+ */
+ @Override
+ public String getSystemDir() {
+ Path sysDir = new Path(conf.get("bspd.system.dir", "/tmp/hadoop/bsp/system"));
+ return fs.makeQualified(sysDir).toString();
+ }
+
+
+ /**
+ * This method returns new job id. The returned job id increases sequentially.
+ */
+ @Override
+ public JobID getNewJobId() throws IOException {
+ return new JobID(this.masterIdentifier, nextJobId++);
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobName) throws IOException {
+
+ return null;
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Sun Apr 25 10:03:28 2010
@@ -48,7 +48,7 @@ public class BSPPeer implements DefaultB
protected InetSocketAddress masterAddr = null;
protected Server server = null;
protected ZooKeeper zk = null;
- protected Integer mutex = 0;
+ protected volatile Integer mutex = 0;
protected final String serverName;
protected final String bindAddress;
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.HeartbeatResponse;
+import org.apache.hama.ipc.InterTrackerProtocol;
+
+public class GroomServer implements Runnable {
+ public static final Log LOG = LogFactory.getLog(GroomServer.class);
+
+ static {
+ Configuration.addDefaultResource("hama-default.xml");
+ }
+
+ static enum State {
+ NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
+ };
+
+ HamaConfiguration conf;
+
+ volatile boolean running = true;
+ volatile boolean shuttingDown = false;
+ boolean justInited = true;
+
+ String groomserverName;
+ String localHostname;
+
+ InetSocketAddress masterAddr;
+ InterTrackerProtocol jobClient;
+ //BSPPeer bspPeer;
+
+ short heartbeatResponseId = -1;
+ private volatile int heartbeatInterval = 3 * 1000;
+
+ private LocalDirAllocator localDirAllocator;
+ Path systemDirectory = null;
+ FileSystem systemFS = null;
+
+ public GroomServer(HamaConfiguration conf) throws IOException {
+ this.conf = conf;
+ masterAddr = BSPMaster.getAddress(conf);
+
+ FileSystem local = FileSystem.getLocal(conf);
+ this.localDirAllocator = new LocalDirAllocator("bspd.groom.local.dir");
+
+ initialize();
+ }
+
+ synchronized void initialize() throws IOException {
+ if (this.conf.get("slave.host.name") != null) {
+ this.localHostname = conf.get("slave.host.name");
+ }
+
+ if (localHostname == null) {
+ this.localHostname = DNS.getDefaultHost(conf.get(
+ "bspd.groom.dns.interface", "default"), conf.get(
+ "bspd.groom.dns.nameserver", "default"));
+ }
+
+ checkLocalDirs(conf.getStrings("bspd.groom.local.dir"));
+ deleteLocalFiles("groomserver");
+
+ this.groomserverName = "groomd_" + localHostname;
+ LOG.info("Starting tracker " + this.groomserverName);
+
+ DistributedCache.purgeCache(this.conf);
+
+ this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(
+ InterTrackerProtocol.class, InterTrackerProtocol.versionID, masterAddr,
+ conf);
+ this.running = true;
+ // this.bspPeer = new BSPPeer(this.conf);
+ }
+
+ private static void checkLocalDirs(String[] localDirs)
+ throws DiskErrorException {
+ boolean writable = false;
+
+ if (localDirs != null) {
+ for (int i = 0; i < localDirs.length; i++) {
+ try {
+ DiskChecker.checkDir(new File(localDirs[i]));
+ writable = true;
+ } catch (DiskErrorException e) {
+ LOG.warn("Graph Processor local " + e.getMessage());
+ }
+ }
+ }
+
+ if (!writable)
+ throw new DiskErrorException("all local directories are not writable");
+ }
+
+ public String[] getLocalDirs() {
+ return conf.getStrings("bspd.groom.local.dir");
+ }
+
+ public void deleteLocalFiles() throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]));
+ }
+ }
+
+ public void deleteLocalFiles(String subdir) throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir));
+ }
+ }
+
+ public void cleanupStorage() throws IOException {
+ deleteLocalFiles();
+ }
+
+ private void startCleanupThreads() throws IOException {
+
+ }
+
+ public State offerService() throws Exception {
+ long lastHeartbeat = 0;
+
+ while (running && !shuttingDown) {
+ try {
+ long now = System.currentTimeMillis();
+
+ long waitTime = heartbeatInterval - (now - lastHeartbeat);
+ if (waitTime > 0) {
+ // sleeps for the wait time
+ Thread.sleep(waitTime);
+ }
+
+ if (justInited) {
+ String dir = jobClient.getSystemDir();
+ if (dir == null) {
+ throw new IOException("Failed to get system directory");
+ }
+ systemDirectory = new Path(dir);
+ systemFS = systemDirectory.getFileSystem(conf);
+ }
+
+ // Send the heartbeat and process the jobtracker's directives
+ HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+
+ // Note the time when the heartbeat returned, use this to decide when to
+ // send the
+ // next heartbeat
+ lastHeartbeat = System.currentTimeMillis();
+
+ justInited = false;
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted. Closing down.");
+ return State.INTERRUPTED;
+ } catch (DiskErrorException de) {
+ String msg = "Exiting task tracker for disk error:\n"
+ + StringUtils.stringifyException(de);
+ LOG.error(msg);
+
+ return State.STALE;
+ } catch (RemoteException re) {
+ return State.DENIED;
+ } catch (Exception except) {
+ String msg = "Caught exception: "
+ + StringUtils.stringifyException(except);
+ LOG.error(msg);
+ }
+ }
+
+ return State.NORMAL;
+ }
+
+ private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+ HeartbeatResponse heartbeatResponse = jobClient
+ .heartbeat(heartbeatResponseId);
+ return heartbeatResponse;
+ }
+
+ public void run() {
+ try {
+ startCleanupThreads();
+ boolean denied = false;
+ while (running && !shuttingDown && !denied) {
+ boolean staleState = false;
+ try {
+ while (running && !staleState && !shuttingDown && !denied) {
+ try {
+ State osState = offerService();
+ if (osState == State.STALE) {
+ staleState = true;
+ } else if (osState == State.DENIED) {
+ denied = true;
+ }
+ } catch (Exception e) {
+ if (!shuttingDown) {
+ LOG.info("Lost connection to GraphProcessor [" + masterAddr
+ + "]. Retrying...", e);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+ } finally {
+ // close();
+ }
+
+ if (shuttingDown) {
+ return;
+ }
+ LOG.warn("Reinitializing local state");
+ initialize();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Got fatal exception while reinitializing TaskTracker: "
+ + StringUtils.stringifyException(ioe));
+ return;
+ }
+ }
+
+ public synchronized void shutdown() throws IOException {
+ shuttingDown = true;
+ close();
+ }
+
+ public synchronized void close() throws IOException {
+ this.running = false;
+
+ cleanupStorage();
+
+ // shutdown RPC connections
+ RPC.stopProxy(jobClient);
+ }
+
+ public static void main(String[] args) {
+ StringUtils.startupShutdownMessage(GroomServer.class, args, LOG);
+ if (args.length != 0) {
+ System.out.println("usage: GroomServer");
+ System.exit(-1);
+ }
+
+ try {
+ HamaConfiguration conf = new HamaConfiguration();
+ new GroomServer(conf).run();
+ } catch (Throwable e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public abstract class ID implements WritableComparable<ID> {
+ protected static final char SEPARATOR = '_';
+ protected int id;
+
+ public ID(int id) {
+ this.id = id;
+ }
+
+ protected ID() {
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Integer.valueOf(id).hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null)
+ return false;
+ if (o.getClass() == this.getClass()) {
+ ID that = (ID) o;
+ return this.id == that.id;
+ } else
+ return false;
+ }
+
+ public int compareTo(ID that) {
+ return this.id - that.id;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.id = in.readInt();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+
+public class JobClient extends Configured {
+ private static final Log LOG = LogFactory.getLog(JobClient.class);
+
+ public static enum TaskStatusFilter {
+ NONE, KILLED, FAILED, SUCCEEDED, ALL
+ }
+
+ static {
+ Configuration.addDefaultResource("hama-default.xml");
+ }
+
+ @SuppressWarnings("unused")
+ private JobSubmissionProtocol jobSubmitClient = null;
+
+ public JobClient() {
+ }
+
+ public JobClient(HamaConfiguration conf) throws IOException {
+ setConf(conf);
+ init(conf);
+ }
+
+ public void init(HamaConfiguration conf) throws IOException {
+ String tracker = conf.get("hama.master.address", "local");
+ this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+ }
+
+ private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
+ Configuration conf) throws IOException {
+ return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
+ conf, JobSubmissionProtocol.class));
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.InputFormat;
+import org.apache.hama.graph.OutputFormat;
+
+/**
+ * A read-only view of the job that is provided to the tasks while they are
+ * running.
+ */
+public class JobContext {
+ // Put all of the attribute names in here so that Job and JobContext are
+ // consistent.
+ protected static final String INPUT_FORMAT_CLASS_ATTR = "angrapa.inputformat.class";
+ protected static final String WALKER_CLASS_ATTR = "angrapa.walker.class";
+ protected static final String OUTPUT_FORMAT_CLASS_ATTR = "angrapa.outputformat.class";
+
+ protected final Configuration conf;
+ private final JobID jobId;
+
+ public JobContext(Configuration conf, JobID jobId) {
+ this.conf = conf;
+ this.jobId = jobId;
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ public JobID getJobID() {
+ return jobId;
+ }
+
+ public Path getWorkingDirectory() throws IOException {
+ String name = conf.get("angrapa.working.dir");
+
+ if (name != null) {
+ return new Path(name);
+ } else {
+ try {
+ Path dir = FileSystem.get(conf).getWorkingDirectory();
+ conf.set("angrapa.working.dir", dir.toString());
+ return dir;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public Class<?> getOutputKeyClass() {
+ return conf.getClass("angrapa.output.key.class", LongWritable.class,
+ Object.class);
+ }
+
+ public Class<?> getOutputValueClass() {
+ return conf
+ .getClass("angrapa.output.value.class", Text.class, Object.class);
+ }
+
+ public String getJobName() {
+ return conf.get("angrapa.job.name", "");
+ }
+
+ @SuppressWarnings("unchecked")
+ public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+ throws ClassNotFoundException {
+ return (Class<? extends InputFormat<?, ?>>) conf.getClass(
+ INPUT_FORMAT_CLASS_ATTR, InputFormat.class); // TODO: To be corrected
+ // to an implemented class
+ }
+
+ @SuppressWarnings("unchecked")
+ public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+ throws ClassNotFoundException {
+ return (Class<? extends OutputFormat<?, ?>>) conf.getClass(
+ OUTPUT_FORMAT_CLASS_ATTR, OutputFormat.class); // TODO: To be corrected
+ // to an implemented
+ // class
+ }
+
+ public RawComparator<?> getSortComparator() {
+ return null;
+ }
+
+ public String getJar() {
+ return conf.get("walker.jar");
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.io.Text;
+
+public class JobID extends ID implements Comparable<ID> {
+ protected static final String JOB = "job";
+ private final Text jtIdentifier;
+
+ protected static final NumberFormat idFormat = NumberFormat.getInstance();
+ static {
+ idFormat.setGroupingUsed(false);
+ idFormat.setMinimumIntegerDigits(4);
+ }
+
+ public JobID(String jtIdentifier, int id) {
+ super(id);
+ this.jtIdentifier = new Text(jtIdentifier);
+ }
+
+ public JobID() {
+ jtIdentifier = new Text();
+ }
+
+ public String getJtIdentifier() {
+ return jtIdentifier.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!super.equals(o))
+ return false;
+
+ JobID that = (JobID) o;
+ return this.jtIdentifier.equals(that.jtIdentifier);
+ }
+
+ @Override
+ public int compareTo(ID o) {
+ JobID that = (JobID) o;
+ int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
+ if (jtComp == 0) {
+ return this.id - that.id;
+ } else
+ return jtComp;
+ }
+
+ public StringBuilder appendTo(StringBuilder builder) {
+ builder.append(SEPARATOR);
+ builder.append(jtIdentifier);
+ builder.append(SEPARATOR);
+ builder.append(idFormat.format(id));
+ return builder;
+ }
+
+ @Override
+ public int hashCode() {
+ return jtIdentifier.hashCode() + id;
+ }
+
+ @Override
+ public String toString() {
+ return appendTo(new StringBuilder(JOB)).toString();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ this.jtIdentifier.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ jtIdentifier.write(out);
+ }
+
+ public static JobID forName(String str) throws IllegalArgumentException {
+ if (str == null)
+ return null;
+ try {
+ String[] parts = str.split("_");
+ if (parts.length == 3) {
+ if (parts[0].equals(JOB)) {
+ return new JobID(parts[1], Integer.parseInt(parts[2]));
+ }
+ }
+ } catch (Exception ex) {
+ }
+ throw new IllegalArgumentException("JobId string : " + str
+ + " is not properly formed");
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+public class JobStatus implements Writable, Cloneable {
+
+ static {
+ WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
+ public Writable newInstance() {
+ return new JobStatus();
+ }
+ });
+ }
+
+ public static final int RUNNING = 1;
+ public static final int SUCCEEDED = 2;
+ public static final int FAILED = 3;
+ public static final int PREP = 4;
+ public static final int KILLED = 5;
+
+ private JobID jobid;
+ private float progress;
+ private float cleanupProgress;
+ private float setupProgress;
+ private int runState;
+ private long startTime;
+ private String schedulingInfo = "NA";
+
+ public JobStatus() {
+ }
+
+ public JobStatus(JobID jobid, float progress, int runState) {
+ this(jobid, progress, 0.0f, runState);
+ }
+
+ public JobStatus(JobID jobid, float progress, float cleanupProgress,
+ int runState) {
+ this(jobid, 0.0f, progress, cleanupProgress, runState);
+ }
+
+ public JobStatus(JobID jobid, float setupProgress, float progress,
+ float cleanupProgress, int runState) {
+ this.jobid = jobid;
+ this.setupProgress = setupProgress;
+ this.progress = progress;
+ this.cleanupProgress = cleanupProgress;
+ this.runState = runState;
+ }
+
+ public JobID getJobID() {
+ return jobid;
+ }
+
+ public synchronized float progress() {
+ return progress;
+ }
+
+ synchronized void setprogress(float p) {
+ this.progress = (float) Math.min(1.0, Math.max(0.0, p));
+ }
+
+ public synchronized float cleanupProgress() {
+ return cleanupProgress;
+ }
+
+ synchronized void setCleanupProgress(float p) {
+ this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p));
+ }
+
+ public synchronized float setupProgress() {
+ return setupProgress;
+ }
+
+ synchronized void setSetupProgress(float p) {
+ this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p));
+ }
+
+ public synchronized int getRunState() {
+ return runState;
+ }
+
+ public synchronized void setRunState(int state) {
+ this.runState = state;
+ }
+
+ synchronized void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ synchronized public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException cnse) {
+ throw new InternalError(cnse.toString());
+ }
+ }
+
+ public synchronized String getSchedulingInfo() {
+ return schedulingInfo;
+ }
+
+ public synchronized void setSchedulingInfo(String schedulingInfo) {
+ this.schedulingInfo = schedulingInfo;
+ }
+
+ public synchronized boolean isJobComplete() {
+ return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED);
+ }
+
+ public synchronized void write(DataOutput out) throws IOException {
+ jobid.write(out);
+ out.writeFloat(setupProgress);
+ out.writeFloat(progress);
+ out.writeFloat(cleanupProgress);
+ out.writeInt(runState);
+ out.writeLong(startTime);
+ Text.writeString(out, schedulingInfo);
+ }
+
+ public synchronized void readFields(DataInput in) throws IOException {
+ this.jobid = new JobID();
+ jobid.readFields(in);
+ this.setupProgress = in.readFloat();
+ this.progress = in.readFloat();
+ this.cleanupProgress = in.readFloat();
+ this.runState = in.readInt();
+ this.startTime = in.readLong();
+ this.schedulingInfo = Text.readString(in);
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The context for task attempts.
+ */
+public class TaskAttemptContext extends JobContext implements Progressable {
+ private final TaskAttemptID taskId;
+ private String status = "";
+
+ public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
+ super(conf, taskId.getJobID());
+ this.taskId = taskId;
+ }
+
+ /**
+ * Get the unique name for this task attempt.
+ */
+ public TaskAttemptID getTaskAttemptID() {
+ return taskId;
+ }
+
+ /**
+ * Set the current status of the task to the given string.
+ */
+ public void setStatus(String msg) throws IOException {
+ status = msg;
+ }
+
+ /**
+ * Get the last set status message.
+ *
+ * @return the current status message
+ */
+ public String getStatus() {
+ return status;
+ }
+
+ /**
+ * Report progress. The subtypes actually do work in this method.
+ */
+ public void progress() {
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class TaskAttemptID extends ID {
+ protected static final String ATTEMPT = "attempt";
+ private TaskID taskId;
+
+ public TaskAttemptID(TaskID taskId, int id) {
+ super(id);
+ if (taskId == null) {
+ throw new IllegalArgumentException("taskId cannot be null");
+ }
+ this.taskId = taskId;
+ }
+
+ public TaskAttemptID(String jtIdentifier, int jobId, boolean isMatrixTask,
+ int taskId, int id) {
+ this(new TaskID(jtIdentifier, jobId, isMatrixTask, taskId), id);
+ }
+
+ public TaskAttemptID() {
+ taskId = new TaskID();
+ }
+
+ public JobID getJobID() {
+ return taskId.getJobID();
+ }
+
+ public TaskID getTaskID() {
+ return taskId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!super.equals(o))
+ return false;
+
+ TaskAttemptID that = (TaskAttemptID) o;
+ return this.taskId.equals(that.taskId);
+ }
+
+ protected StringBuilder appendTo(StringBuilder builder) {
+ return taskId.appendTo(builder).append(SEPARATOR).append(id);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ taskId.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ taskId.write(out);
+ }
+
+ @Override
+ public int hashCode() {
+ return taskId.hashCode() * 5 + id;
+ }
+
+ @Override
+ public int compareTo(ID o) {
+ TaskAttemptID that = (TaskAttemptID) o;
+ int tipComp = this.taskId.compareTo(that.taskId);
+ if (tipComp == 0) {
+ return this.id - that.id;
+ } else
+ return tipComp;
+ }
+
+ @Override
+ public String toString() {
+ return appendTo(new StringBuilder(ATTEMPT)).toString();
+ }
+
+ public static TaskAttemptID forName(String str)
+ throws IllegalArgumentException {
+ if (str == null)
+ return null;
+ try {
+ String[] parts = str.split(Character.toString(SEPARATOR));
+ if (parts.length == 6) {
+ if (parts[0].equals(ATTEMPT)) {
+ boolean isMatrixTask = false;
+ if (parts[3].equals("m"))
+ isMatrixTask = true;
+ else if (parts[3].equals("g"))
+ isMatrixTask = false;
+ else
+ throw new Exception();
+
+ return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
+ isMatrixTask, Integer.parseInt(parts[4]), Integer
+ .parseInt(parts[5]));
+ }
+ }
+ } catch (Exception ex) {
+ // fall below
+ }
+ throw new IllegalArgumentException("TaskAttemptId string : " + str
+ + " is not properly formed");
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+public class TaskID extends ID {
+ protected static final String TASK = "task";
+ protected static final NumberFormat idFormat = NumberFormat.getInstance();
+ static {
+ idFormat.setGroupingUsed(false);
+ idFormat.setMinimumIntegerDigits(6);
+ }
+
+ private JobID jobId;
+ private boolean isMatrixTask;
+
+ public TaskID(JobID jobId, boolean isMatrixTask, int id) {
+ super(id);
+ if (jobId == null) {
+ throw new IllegalArgumentException("jobId cannot be null");
+ }
+ this.jobId = jobId;
+ this.isMatrixTask = isMatrixTask;
+ }
+
+ public TaskID(String jtIdentifier, int jobId, boolean isGraphTask, int id) {
+ this(new JobID(jtIdentifier, jobId), isGraphTask, id);
+ }
+
+ public TaskID() {
+ jobId = new JobID();
+ }
+
+ /** Returns the {@link JobID} object that this tip belongs to */
+ public JobID getJobID() {
+ return jobId;
+ }
+
+ public boolean isGraphTask() {
+ return isMatrixTask;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!super.equals(o))
+ return false;
+
+ TaskID that = (TaskID) o;
+ return this.isMatrixTask == that.isMatrixTask
+ && this.jobId.equals(that.jobId);
+ }
+
+ @Override
+ public int compareTo(ID o) {
+ TaskID that = (TaskID) o;
+ int jobComp = this.jobId.compareTo(that.jobId);
+ if (jobComp == 0) {
+ if (this.isMatrixTask == that.isMatrixTask) {
+ return this.id - that.id;
+ } else
+ return this.isMatrixTask ? -1 : 1;
+ } else {
+ return jobComp;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return appendTo(new StringBuilder(TASK)).toString();
+ }
+
+ protected StringBuilder appendTo(StringBuilder builder) {
+ return jobId.appendTo(builder).append(SEPARATOR).append(
+ isMatrixTask ? 'm' : 'g').append(SEPARATOR).append(idFormat.format(id));
+ }
+
+ @Override
+ public int hashCode() {
+ return jobId.hashCode() * 524287 + id;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ jobId.readFields(in);
+ isMatrixTask = in.readBoolean();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ jobId.write(out);
+ out.writeBoolean(isMatrixTask);
+ }
+
+ public static TaskID forName(String str) throws IllegalArgumentException {
+ if (str == null)
+ return null;
+ try {
+ String[] parts = str.split("_");
+ if (parts.length == 5) {
+ if (parts[0].equals(TASK)) {
+ boolean isMatrixTask = false;
+ if (parts[3].equals("m"))
+ isMatrixTask = true;
+ else if (parts[3].equals("g"))
+ isMatrixTask = false;
+ else
+ throw new Exception();
+ return new TaskID(parts[1], Integer.parseInt(parts[2]), isMatrixTask,
+ Integer.parseInt(parts[4]));
+ }
+ }
+ } catch (Exception ex) {
+ }
+ throw new IllegalArgumentException("TaskId string : " + str
+ + " is not properly formed");
+ }
+}
Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java Sun Apr 25 10:03:28 2010
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hama.bsp.JobContext;
public abstract class InputFormat<KEYIN, VALUEIN> {
Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java Sun Apr 25 10:03:28 2010
@@ -20,6 +20,9 @@ package org.apache.hama.graph;
import java.io.IOException;
+import org.apache.hama.bsp.JobContext;
+import org.apache.hama.bsp.TaskAttemptContext;
+
/**
*
* @see JobContext
Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java Sun Apr 25 10:03:28 2010
@@ -21,6 +21,8 @@ package org.apache.hama.graph;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hama.bsp.JobContext;
+import org.apache.hama.bsp.TaskAttemptContext;
/**
* <code>OutputFormat</code> describes the output-specification for a Angrapa
Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java Sun Apr 25 10:03:28 2010
@@ -21,6 +21,7 @@ package org.apache.hama.graph;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hama.bsp.TaskAttemptContext;
/**
* <code>RecordWriter</code> writes the output <key, value> pairs to an
Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java Sun Apr 25 10:03:28 2010
@@ -21,8 +21,8 @@ package org.apache.hama.ipc;
import java.io.IOException;
-import org.apache.hama.graph.JobID;
-import org.apache.hama.graph.JobStatus;
+import org.apache.hama.bsp.JobID;
+import org.apache.hama.bsp.JobStatus;
/**
* Protocol that a Walker and the central Master use to communicate. This