You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2011/08/01 16:12:56 UTC
svn commit: r1152788 [3/9] - in /incubator/hama/trunk: ./ bin/ conf/ core/
core/bin/ core/conf/ core/src/ core/src/main/ core/src/main/java/
core/src/main/java/org/ core/src/main/java/org/apache/
core/src/main/java/org/apache/hama/ core/src/main/java/o...
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a boolean value.
+ */
+public class BooleanMessage extends BSPMessage {
+
+ String tag;
+ boolean data;
+
+ public BooleanMessage() {
+ super();
+ }
+
+ public BooleanMessage(String tag, boolean data) {
+ super();
+ this.tag = tag;
+ this.data = data;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(tag);
+ out.writeBoolean(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ tag = in.readUTF();
+ data = in.readBoolean();
+ }
+
+ @Override
+ public String getTag() {
+ return tag;
+ }
+
+ @Override
+ public Boolean getData() {
+ return data;
+ }
+}
\ No newline at end of file
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ByteMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ByteMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ByteMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ByteMessage.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a byte tag and a byte data.
+ */
+public class ByteMessage extends BSPMessage {
+
+ private byte[] tag;
+ private byte[] data;
+
+ public ByteMessage() {
+ super();
+ }
+
+ public ByteMessage(byte[] tag, byte[] data) {
+ super();
+ this.tag = tag;
+ this.data = data;
+ }
+
+ @Override
+ public byte[] getTag() {
+ return this.tag;
+ }
+
+ @Override
+ public byte[] getData() {
+ return this.data;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.tag = new byte[in.readInt()];
+ in.readFully(tag, 0, this.tag.length);
+ this.data = new byte[in.readInt()];
+ in.readFully(data, 0, this.data.length);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(tag.length);
+ out.write(tag);
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Status information on the current state of the BSP cluster.
+ *
+ * <p><code>ClusterStatus</code> provides clients with information such as:
+ * <ol>
+ * <li>
+ * Size of the cluster.
+ * </li>
+ * <li>
+ * Name of the grooms.
+ * </li>
+ * <li>
+ * Task capacity of the cluster.
+ * </li>
+ * <li>
+ * The number of currently running bsp tasks.
+ * </li>
+ * <li>
+ * State of the <code>BSPMaster</code>.
+ * </li>
+ * </ol></p>
+ *
+ * <p>Clients can query for the latest <code>ClusterStatus</code>, via
+ * {@link BSPJobClient#getClusterStatus(boolean)}.</p>
+ *
+ * @see BSPMaster
+ */
+public class ClusterStatus implements Writable {
+
+ private int numActiveGrooms;
+ private Map<String, String> activeGrooms = new HashMap<String, String>();
+ private int tasks;
+ private int maxTasks;
+ private BSPMaster.State state;
+
+ /**
+ *
+ */
+ public ClusterStatus() {}
+
+ public ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) {
+ this.numActiveGrooms = grooms;
+ this.tasks = tasks;
+ this.maxTasks = maxTasks;
+ this.state = state;
+ }
+
+ public ClusterStatus(Map<String, String> activeGrooms, int tasks, int maxTasks,
+ BSPMaster.State state) {
+ this(activeGrooms.size(), tasks, maxTasks, state);
+ this.activeGrooms = activeGrooms;
+ }
+
+ /**
+ * Get the number of groom servers in the cluster.
+ *
+ * @return the number of groom servers in the cluster.
+ */
+ public int getGroomServers() {
+ return numActiveGrooms;
+ }
+
+ /**
+ * Get the names of groom servers, and their peers, in the cluster.
+ *
+ * @return the active groom servers in the cluster.
+ */
+ public Map<String, String> getActiveGroomNames() {
+ return activeGrooms;
+ }
+
+ /**
+ * Get the number of currently running tasks in the cluster.
+ *
+ * @return the number of currently running tasks in the cluster.
+ */
+ public int getTasks() {
+ return tasks;
+ }
+
+ /**
+ * Get the maximum capacity for running tasks in the cluster.
+ *
+ * @return the maximum capacity for running tasks in the cluster.
+ */
+ public int getMaxTasks() {
+ return maxTasks;
+ }
+
+ /**
+ * Get the current state of the <code>BSPMaster</code>,
+ * as {@link BSPMaster.State}
+ *
+ * @return the current state of the <code>BSPMaster</code>.
+ */
+ public BSPMaster.State getBSPMasterState() {
+ return state;
+ }
+
+ //////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (activeGrooms.isEmpty()) {
+ out.writeInt(numActiveGrooms);
+ out.writeBoolean(false);
+ } else {
+ out.writeInt(activeGrooms.size());
+ out.writeBoolean(true);
+
+ String[] groomNames = activeGrooms.keySet().toArray(new String[0]);
+ List<String> peerNames = new ArrayList<String>();
+
+ for (String groomName : groomNames) {
+ peerNames.add(activeGrooms.get(groomName));
+ }
+
+ WritableUtils.writeCompressedStringArray(out, groomNames);
+ WritableUtils.writeCompressedStringArray(out, peerNames.toArray(new String[0]));
+ }
+ out.writeInt(tasks);
+ out.writeInt(maxTasks);
+ WritableUtils.writeEnum(out, state);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ numActiveGrooms = in.readInt();
+ boolean groomListFollows = in.readBoolean();
+
+ if (groomListFollows) {
+ String[] groomNames = WritableUtils.readCompressedStringArray(in);
+ String[] peerNames = WritableUtils.readCompressedStringArray(in);
+ activeGrooms = new HashMap<String, String>(groomNames.length);
+
+ for (int i = 0; i < groomNames.length; i++) {
+ activeGrooms.put(groomNames[i], peerNames[i]);
+ }
+ }
+
+ tasks = in.readInt();
+ maxTasks = in.readInt();
+ state = WritableUtils.readEnum(in, BSPMaster.State.class);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to commit the output
+ * of the task.
+ */
+class CommitTaskAction extends GroomServerAction {
+ private TaskAttemptID taskId;
+
+ public CommitTaskAction() {
+ super(ActionType.COMMIT_TASK);
+ taskId = new TaskAttemptID();
+ }
+
+ public CommitTaskAction(TaskAttemptID taskId) {
+ super(ActionType.COMMIT_TASK);
+ this.taskId = taskId;
+ }
+
+ public TaskAttemptID getTaskID() {
+ return taskId;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ taskId.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ taskId.readFields(in);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
+ */
+public class Directive implements Writable{
+
+ protected long timestamp;
+ protected Directive.Type type;
+
+ public static enum Type {
+ Request(1), Response(2);
+ int t;
+
+ Type(int t) {
+ this.t = t;
+ }
+
+ public int value() {
+ return this.t;
+ }
+ };
+
+ public Directive(){}
+
+ public Directive(Directive.Type type) {
+ this.timestamp = System.currentTimeMillis();
+ this.type = type;
+ }
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+
+ public Directive.Type getType() {
+ return this.type;
+ }
+
+ /**
+ * Command for BSPMaster or GroomServer to execute.
+ public abstract void execute() throws Exception;
+ */
+
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.timestamp);
+ out.writeInt(this.type.value());
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.timestamp = in.readLong();
+ int t = in.readInt();
+ if (Directive.Type.Request.value() == t) {
+ this.type = Directive.Type.Request;
+ }else{
+ this.type = Directive.Type.Response;
+ }
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveException.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveException.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveException.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveException.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A custom exception class for Directive.
+ */
+public class DirectiveException extends RuntimeException{
+ private static final long serialVersionUID = -8052582046894492822L;
+
+ public DirectiveException(){
+ super();
+ }
+
+ public DirectiveException(String message){
+ super(message);
+ }
+
+ public DirectiveException(String message, Throwable t){
+ super(message, t);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A DirectiveHandler interface.
+ */
+public interface DirectiveHandler{
+
+ /**
+ * Handle directives on demand.
+ * @param directive to be handled.
+ */
+ void handle(Directive directive) throws DirectiveException;
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Handles the tasks dispatching between the BSPMaster and the GroomServers.
+ */
+public final class DispatchTasksDirective extends Directive implements Writable {
+
+ public static final Log LOG = LogFactory.getLog(DispatchTasksDirective.class);
+
+ private Map<String, String> groomServerPeers;
+ private GroomServerAction[] actions;
+
+ public DispatchTasksDirective() {
+ super();
+ }
+
+ public DispatchTasksDirective(Map<String, String> groomServerPeers,
+ GroomServerAction[] actions) {
+ super(Directive.Type.Request);
+ this.groomServerPeers = groomServerPeers;
+ this.actions = actions;
+ }
+
+ public Map<String, String> getGroomServerPeers() {
+ return this.groomServerPeers;
+ }
+
+ public GroomServerAction[] getActions() {
+ return this.actions;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ if (this.actions == null) {
+ WritableUtils.writeVInt(out, 0);
+ } else {
+ WritableUtils.writeVInt(out, actions.length);
+ for (GroomServerAction action : this.actions) {
+ WritableUtils.writeEnum(out, action.getActionType());
+ action.write(out);
+ }
+ }
+ String[] groomServerNames = groomServerPeers.keySet()
+ .toArray(new String[0]);
+ WritableUtils.writeCompressedStringArray(out, groomServerNames);
+
+ List<String> groomServerAddresses = new ArrayList<String>(
+ groomServerNames.length);
+ for (String groomName : groomServerNames) {
+ groomServerAddresses.add(groomServerPeers.get(groomName));
+ }
+ WritableUtils.writeCompressedStringArray(out, groomServerAddresses
+ .toArray(new String[0]));
+
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ int length = WritableUtils.readVInt(in);
+ if (length > 0) {
+ this.actions = new GroomServerAction[length];
+ for (int i = 0; i < length; ++i) {
+ GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
+ GroomServerAction.ActionType.class);
+ actions[i] = GroomServerAction.createAction(actionType);
+ actions[i].readFields(in);
+ }
+ } else {
+ this.actions = null;
+ }
+ String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
+ String[] groomServerAddresses = WritableUtils.readCompressedStringArray(in);
+ groomServerPeers = new HashMap<String, String>(groomServerNames.length);
+
+ for (int i = 0; i < groomServerNames.length; i++) {
+ groomServerPeers.put(groomServerNames[i], groomServerAddresses[i]);
+ }
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a double data.
+ */
+public class DoubleMessage extends BSPMessage {
+
+ private String tag;
+ private double data;
+
+ public DoubleMessage() {
+ super();
+ }
+
+ public DoubleMessage(String tag, double data) {
+ super();
+ this.data = data;
+ this.tag = tag;
+ }
+
+ @Override
+ public String getTag() {
+ return tag;
+ }
+
+ @Override
+ public Double getData() {
+ return data;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(tag);
+ out.writeDouble(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ tag = in.readUTF();
+ data = in.readDouble();
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class FCFSQueue implements Queue<JobInProgress> {
+
+ public static final Log LOG = LogFactory.getLog(FCFSQueue.class);
+ private final String name;
+ private BlockingQueue<JobInProgress> queue = new LinkedBlockingQueue<JobInProgress>();
+
+ public FCFSQueue(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void addJob(JobInProgress job) {
+ try {
+ queue.put(job);
+ } catch (InterruptedException ie) {
+ LOG.error("Fail to add a job to the " + this.name + " queue.", ie);
+ }
+ }
+
+ @Override
+ public void removeJob(JobInProgress job) {
+ queue.remove(job);
+ }
+
+ @Override
+ public JobInProgress removeJob() {
+ try {
+ return queue.take();
+ } catch (InterruptedException ie) {
+ LOG.error("Fail to remove a job from the " + this.name + " queue.", ie);
+ }
+ return null;
+ }
+
+ @Override
+ public Collection<JobInProgress> jobs() {
+ return queue;
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,920 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.ipc.GroomProtocol;
+import org.apache.log4j.LogManager;
+
+/**
+ * A Groom Server (shortly referred to as groom) is a process that performs bsp
+ * tasks assigned by BSPMaster. Each groom contacts the BSPMaster, and it takes
+ * assigned tasks and reports its status by means of periodical piggybacks with
+ * BSPMaster. Each groom is designed to run with HDFS or other distributed
+ * storages. Basically, a groom server and a data node should be run on one
+ * physical node.
+ */
+public class GroomServer implements Runnable, GroomProtocol, BSPPeerProtocol {
+ public static final Log LOG = LogFactory.getLog(GroomServer.class);
+ static final String SUBDIR = "groomServer";
+
+ private volatile static int REPORT_INTERVAL = 1 * 1000;
+
+ Configuration conf;
+
+ // Constants
+ static enum State {
+ NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
+ };
+
+ // Running States and its related things
+ volatile boolean initialized = false;
+ volatile boolean running = true;
+ volatile boolean shuttingDown = false;
+ boolean justInited = true;
+ GroomServerStatus status = null;
+
+ // Attributes
+ String groomServerName;
+ String localHostname;
+ String groomHostName;
+ InetSocketAddress bspMasterAddr;
+ private Instructor instructor;
+
+ // Filesystem
+ // private LocalDirAllocator localDirAllocator;
+ Path systemDirectory = null;
+ FileSystem systemFS = null;
+
+ // Job
+ private int failures;
+ private int maxCurrentTasks = 1;
+ Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
+ /** Map from taskId -> TaskInProgress. */
+ Map<TaskAttemptID, TaskInProgress> runningTasks = null;
+ Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
+ Map<BSPJobID, RunningJob> runningJobs = null;
+
+ // new nexus between GroomServer and BSPMaster
+ // holds/ manage all tasks
+ // List<TaskInProgress> tasksList = new
+ // CopyOnWriteArrayList<TaskInProgress>();
+
+ private String rpcServer;
+ private Server workerServer;
+ MasterProtocol masterClient;
+
+ InetSocketAddress taskReportAddress;
+ Server taskReportServer = null;
+
+ private PeerNames allPeerNames = null;
+
+ // private BlockingQueue<GroomServerAction> tasksToCleanup = new
+ // LinkedBlockingQueue<GroomServerAction>();
+
+ private class DispatchTasksHandler implements DirectiveHandler {
+
+ public void handle(Directive directive) throws DirectiveException {
+ GroomServerAction[] actions = ((DispatchTasksDirective) directive)
+ .getActions();
+
+ allPeerNames = new PeerNames(((DispatchTasksDirective) directive)
+ .getGroomServerPeers().values());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got Response from BSPMaster with "
+ + ((actions != null) ? actions.length : 0) + " actions");
+ }
+
+ if (actions != null) {
+ for (GroomServerAction action : actions) {
+ if (action instanceof LaunchTaskAction) {
+ startNewTask((LaunchTaskAction) action);
+ } else {
+
+ // TODO Use the cleanup thread
+ // tasksToCleanup.put(action);
+
+ KillTaskAction killAction = (KillTaskAction) action;
+ if (tasks.containsKey(killAction.getTaskID())) {
+ TaskInProgress tip = tasks.get(killAction.getTaskID());
+ tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+ try {
+ tip.killAndCleanup(true);
+ } catch (IOException ioe) {
+ throw new DirectiveException("Error when killing a "
+ + "TaskInProgress.", ioe);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private class Instructor extends Thread {
+ final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
+ final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
+
+ public void bind(Class<? extends Directive> instruction,
+ DirectiveHandler handler) {
+ handlers.putIfAbsent(instruction, handler);
+ }
+
+ public void put(Directive directive) {
+ try {
+ buffer.put(directive);
+ } catch (InterruptedException ie) {
+ LOG.error("Unable to put directive into queue.", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ Directive directive = buffer.take();
+ if (directive instanceof DispatchTasksDirective) {
+ ((DirectiveHandler) handlers.get(DispatchTasksDirective.class))
+ .handle(directive);
+ } else {
+ throw new RuntimeException("Directive is not supported."
+ + directive);
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Unable to retrieve directive from the queue.", ie);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOG.error("Fail to execute directive.", e);
+ }
+ }
+ }
+ }
+
+ public GroomServer(Configuration conf) throws IOException {
+ LOG.info("groom start");
+ this.conf = conf;
+
+ bspMasterAddr = BSPMaster.getAddress(conf);
+ // FileSystem local = FileSystem.getLocal(conf);
+ // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
+ }
+
+ public synchronized void initialize() throws IOException {
+ if (this.conf.get(Constants.PEER_HOST) != null) {
+ this.localHostname = conf.get(Constants.PEER_HOST);
+ }
+
+ if (localHostname == null) {
+ this.localHostname = DNS.getDefaultHost(
+ conf.get("bsp.dns.interface", "default"),
+ conf.get("bsp.dns.nameserver", "default"));
+ }
+ // check local disk
+ checkLocalDirs(conf.getStrings("bsp.local.dir"));
+ deleteLocalFiles("groomserver");
+
+ // Clear out state tables
+ this.tasks.clear();
+ this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
+ this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+ this.finishedTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+ this.conf.set(Constants.PEER_HOST, localHostname);
+ this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
+
+ int rpcPort = -1;
+ String rpcAddr = null;
+ if (false == this.initialized) {
+ rpcAddr = conf.get(Constants.GROOM_RPC_HOST,
+ Constants.DEFAULT_GROOM_RPC_HOST);
+ rpcPort = conf.getInt(Constants.GROOM_RPC_PORT,
+ Constants.DEFAULT_GROOM_RPC_PORT);
+ if (-1 == rpcPort || null == rpcAddr)
+ throw new IllegalArgumentException("Error rpc address " + rpcAddr
+ + " port" + rpcPort);
+ this.workerServer = RPC.getServer(this, rpcAddr, rpcPort, conf);
+ this.workerServer.start();
+ this.rpcServer = rpcAddr + ":" + rpcPort;
+
+ LOG.info("Worker rpc server --> " + rpcServer);
+ }
+
+ @SuppressWarnings("deprecation")
+ String address = NetUtils.getServerAddress(conf,
+ "bsp.groom.report.bindAddress", "bsp.groom.report.port",
+ "bsp.groom.report.address");
+ InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
+ String bindAddress = socAddr.getHostName();
+ int tmpPort = socAddr.getPort();
+
+ // RPC initialization
+ // TODO numHandlers should be a ..
+ this.taskReportServer = RPC.getServer(this, bindAddress, tmpPort, 10,
+ false, this.conf);
+
+ this.taskReportServer.start();
+
+ // get the assigned address
+ this.taskReportAddress = taskReportServer.getListenerAddress();
+ this.conf.set("bsp.groom.report.address", taskReportAddress.getHostName()
+ + ":" + taskReportAddress.getPort());
+ LOG.info("GroomServer up at: " + this.taskReportAddress);
+
+ this.groomHostName = rpcAddr;
+ this.groomServerName = "groomd_" + this.rpcServer.replace(':', '_');
+ LOG.info("Starting groom: " + this.rpcServer);
+
+ DistributedCache.purgeCache(this.conf);
+
+ // establish the communication link to bsp master
+ this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class,
+ MasterProtocol.versionID, bspMasterAddr, conf);
+
+ // enroll in bsp master
+ if (-1 == rpcPort || null == rpcAddr)
+ throw new IllegalArgumentException("Error rpc address " + rpcAddr
+ + " port" + rpcPort);
+ if (!this.masterClient.register(new GroomServerStatus(groomServerName,
+ getBspPeerName(), cloneAndResetRunningTaskStatuses(), failures,
+ maxCurrentTasks, this.rpcServer))) {
+ LOG.error("There is a problem in establishing communication"
+ + " link with BSPMaster");
+ throw new IOException("There is a problem in establishing"
+ + " communication link with BSPMaster.");
+ }
+
+ this.instructor = new Instructor();
+ this.instructor.bind(DispatchTasksDirective.class,
+ new DispatchTasksHandler());
+ instructor.start();
+ this.running = true;
+ this.initialized = true;
+ }
+
+ /** Return the port at which the tasktracker bound to */
+ public synchronized InetSocketAddress getTaskTrackerReportAddress() {
+ return taskReportAddress;
+ }
+
+ @Override
+ public void dispatch(Directive directive) throws IOException {
+ if (!instructor.isAlive())
+ throw new IOException();
+
+ instructor.put(directive);
+ }
+
+ private static void checkLocalDirs(String[] localDirs)
+ throws DiskErrorException {
+ boolean writable = false;
+
+ LOG.info(localDirs);
+
+ if (localDirs != null) {
+ for (int i = 0; i < localDirs.length; i++) {
+ try {
+ LOG.info(localDirs[i]);
+ DiskChecker.checkDir(new File(localDirs[i]));
+ writable = true;
+ } catch (DiskErrorException e) {
+ LOG.warn("BSP Processor local " + e.getMessage());
+ }
+ }
+ }
+
+ if (!writable)
+ throw new DiskErrorException("all local directories are not writable");
+ }
+
+ public String[] getLocalDirs() {
+ return conf.getStrings("bsp.local.dir");
+ }
+
+ public void deleteLocalFiles() throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true);
+ }
+ }
+
+ public void deleteLocalFiles(String subdir) throws IOException {
+ try {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),
+ true);
+ }
+ } catch (NullPointerException e) {
+ LOG.info(e);
+ }
+ }
+
+ public void cleanupStorage() throws IOException {
+ deleteLocalFiles();
+ }
+
+ private void startCleanupThreads() throws IOException {
+
+ }
+
+ public State offerService() throws Exception {
+ while (running && !shuttingDown) {
+ try {
+
+ // Reports to a BSPMaster
+ for (Map.Entry<TaskAttemptID, TaskInProgress> e : runningTasks
+ .entrySet()) {
+ Thread.sleep(REPORT_INTERVAL);
+ TaskInProgress tip = e.getValue();
+ TaskStatus taskStatus = tip.getStatus();
+
+ if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+ taskStatus.setProgress(taskStatus.getSuperstepCount());
+
+ if (!tip.runner.isAlive()) {
+ if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
+ taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ }
+ taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
+ }
+ }
+
+ doReport(taskStatus);
+ }
+
+ Thread.sleep(REPORT_INTERVAL);
+ } catch (InterruptedException ie) {
+ }
+
+ try {
+ if (justInited) {
+ String dir = masterClient.getSystemDir();
+ if (dir == null) {
+ LOG.error("Fail to get system directory.");
+ throw new IOException("Fail to get system directory.");
+ }
+ systemDirectory = new Path(dir);
+ systemFS = systemDirectory.getFileSystem(conf);
+ }
+ justInited = false;
+ } catch (DiskErrorException de) {
+ String msg = "Exiting groom server for disk error:\n"
+ + StringUtils.stringifyException(de);
+ LOG.error(msg);
+
+ return State.STALE;
+ } catch (RemoteException re) {
+ return State.DENIED;
+ } catch (Exception except) {
+ String msg = "Caught exception: "
+ + StringUtils.stringifyException(except);
+ LOG.error(msg);
+ }
+ }
+ return State.NORMAL;
+ }
+
+ private void startNewTask(LaunchTaskAction action) {
+ Task t = action.getTask();
+ BSPJob jobConf = null;
+ try {
+ jobConf = new BSPJob(t.getJobID(), t.getJobFile());
+ } catch (IOException e1) {
+ LOG.error(e1);
+ }
+
+ TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);
+
+ synchronized (this) {
+ tasks.put(t.getTaskID(), tip);
+ runningTasks.put(t.getTaskID(), tip);
+ }
+
+ try {
+ localizeJob(tip);
+ } catch (Throwable e) {
+ String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
+ .stringifyException(e));
+ LOG.warn(msg);
+ }
+ }
+
+ /**
+ * Update and report refresh status back to BSPMaster.
+ */
+ public void doReport(TaskStatus taskStatus) {
+ GroomServerStatus groomStatus = new GroomServerStatus(groomServerName,
+ getBspPeerName(), updateTaskStatus(taskStatus), failures,
+ maxCurrentTasks, rpcServer);
+ try {
+ boolean ret = masterClient.report(new ReportGroomStatusDirective(
+ groomStatus));
+ if (!ret) {
+ LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
+ + " groom name: " + groomStatus.getGroomName() + " peer name:"
+ + groomStatus.getPeerName() + " rpc server:" + rpcServer);
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
+ }
+ }
+
+ public List<TaskStatus> updateTaskStatus(TaskStatus taskStatus) {
+ List<TaskStatus> tlist = new ArrayList<TaskStatus>();
+ synchronized (runningTasks) {
+
+ if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED
+ || taskStatus.getRunState() == TaskStatus.State.FAILED) {
+ synchronized (finishedTasks) {
+ TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId());
+ tlist.add((TaskStatus) taskStatus.clone());
+ finishedTasks.put(taskStatus.getTaskId(), tip);
+ }
+ } else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+ tlist.add((TaskStatus) taskStatus.clone());
+ }
+
+ }
+ return tlist;
+ }
+
+ private void localizeJob(TaskInProgress tip) throws IOException {
+ Task task = tip.getTask();
+ conf.addResource(task.getJobFile());
+ BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
+ Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ + task.getTaskID() + "/" + "job.xml");
+
+ RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
+ BSPJob jobConf = null;
+
+ synchronized (rjob) {
+ if (!rjob.localized) {
+ Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ + task.getTaskID() + "/" + "job.jar");
+ systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
+
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.addResource(localJobFile);
+ jobConf = new BSPJob(conf, task.getJobID().toString());
+
+ Path jarFile = new Path(jobConf.getJar());
+ jobConf.setJar(localJarFile.toString());
+
+ if (jarFile != null) {
+ systemFS.copyToLocalFile(jarFile, localJarFile);
+
+ // also unjar the job.jar files in workdir
+ File workDir = new File(
+ new File(localJobFile.toString()).getParent(), "work");
+ if (!workDir.mkdirs()) {
+ if (!workDir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
+ }
+ }
+ RunJar.unJar(new File(localJarFile.toString()), workDir);
+ }
+ rjob.localized = true;
+ }
+ }
+
+ launchTaskForJob(tip, jobConf);
+ }
+
+ private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
+ try {
+ tip.setJobConf(jobConf);
+ tip.launchTask();
+ } catch (Throwable ie) {
+ tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+ String error = StringUtils.stringifyException(ie);
+ LOG.info(error);
+ }
+ }
+
+ private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
+ TaskInProgress tip) {
+ synchronized (runningJobs) {
+ RunningJob rJob = null;
+ if (!runningJobs.containsKey(jobId)) {
+ rJob = new RunningJob(jobId, localJobFile);
+ rJob.localized = false;
+ rJob.tasks = new HashSet<TaskInProgress>();
+ rJob.jobFile = localJobFile;
+ runningJobs.put(jobId, rJob);
+ } else {
+ rJob = runningJobs.get(jobId);
+ }
+ rJob.tasks.add(tip);
+ return rJob;
+ }
+ }
+
+ /**
+ * The datastructure for initializing a job
+ */
+ static class RunningJob {
+ private BSPJobID jobid;
+ private Path jobFile;
+ // keep this for later use
+ Set<TaskInProgress> tasks;
+ boolean localized;
+ boolean keepJobFiles;
+
+ RunningJob(BSPJobID jobid, Path jobFile) {
+ this.jobid = jobid;
+ localized = false;
+ tasks = new HashSet<TaskInProgress>();
+ this.jobFile = jobFile;
+ keepJobFiles = false;
+ }
+
+ Path getJobFile() {
+ return jobFile;
+ }
+
+ BSPJobID getJobId() {
+ return jobid;
+ }
+ }
+
+ private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() {
+ List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
+ for (TaskInProgress tip : runningTasks.values()) {
+ TaskStatus status = tip.getStatus();
+ result.add((TaskStatus) status.clone());
+ }
+ return result;
+ }
+
+ public void run() {
+ try {
+ initialize();
+ startCleanupThreads();
+ boolean denied = false;
+ while (running && !shuttingDown && !denied) {
+
+ boolean staleState = false;
+ try {
+ while (running && !staleState && !shuttingDown && !denied) {
+ try {
+ State osState = offerService();
+ if (osState == State.STALE) {
+ staleState = true;
+ } else if (osState == State.DENIED) {
+ denied = true;
+ }
+ } catch (Exception e) {
+ if (!shuttingDown) {
+ LOG.info("Lost connection to BSP Master [" + bspMasterAddr
+ + "]. Retrying...", e);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+ } finally {
+ // close();
+ }
+ if (shuttingDown) {
+ return;
+ }
+ LOG.warn("Reinitializing local state");
+ initialize();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Got fatal exception while reinitializing GroomServer: "
+ + StringUtils.stringifyException(ioe));
+ return;
+ }
+ }
+
+ public synchronized void shutdown() throws IOException {
+ shuttingDown = true;
+ close();
+ }
+
+ public synchronized void close() throws IOException {
+ this.running = false;
+ this.initialized = false;
+ cleanupStorage();
+ this.workerServer.stop();
+ RPC.stopProxy(masterClient);
+
+ if (taskReportServer != null) {
+ taskReportServer.stop();
+ taskReportServer = null;
+ }
+ }
+
+ public static Thread startGroomServer(final GroomServer hrs) {
+ return startGroomServer(hrs, "regionserver" + hrs.groomServerName);
+ }
+
+ public static Thread startGroomServer(final GroomServer hrs, final String name) {
+ Thread t = new Thread(hrs);
+ t.setName(name);
+ t.start();
+ return t;
+ }
+
+ // /////////////////////////////////////////////////////
+ // TaskInProgress maintains all the info for a Task that
+ // lives at this GroomServer. It maintains the Task object,
+ // its TaskStatus, and the BSPTaskRunner.
+ // /////////////////////////////////////////////////////
+ class TaskInProgress {
+ Task task;
+ BSPJob jobConf;
+ BSPJob localJobConf;
+ BSPTaskRunner runner;
+ volatile boolean done = false;
+ volatile boolean wasKilled = false;
+ private TaskStatus taskStatus;
+
+ public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
+ this.task = task;
+ this.jobConf = jobConf;
+ this.localJobConf = null;
+ this.taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), 0,
+ TaskStatus.State.UNASSIGNED, "running", groomServer,
+ TaskStatus.Phase.STARTING);
+ }
+
+ private void localizeTask(Task task) throws IOException {
+ Path localJobFile = this.jobConf.getLocalPath(SUBDIR + "/"
+ + task.getTaskID() + "/job.xml");
+ Path localJarFile = this.jobConf.getLocalPath(SUBDIR + "/"
+ + task.getTaskID() + "/job.jar");
+
+ String jobFile = task.getJobFile();
+ systemFS.copyToLocalFile(new Path(jobFile), localJobFile);
+ task.setJobFile(localJobFile.toString());
+
+ localJobConf = new BSPJob(task.getJobID(), localJobFile.toString());
+ localJobConf.set("bsp.task.id", task.getTaskID().toString());
+ String jarFile = localJobConf.getJar();
+
+ if (jarFile != null) {
+ systemFS.copyToLocalFile(new Path(jarFile), localJarFile);
+ localJobConf.setJar(localJarFile.toString());
+ }
+
+ LOG.debug("localizeTask : " + localJobConf.getJar());
+ LOG.debug("localizeTask : " + localJobFile.toString());
+
+ task.setConf(localJobConf);
+ }
+
+ public synchronized void setJobConf(BSPJob jobConf) {
+ this.jobConf = jobConf;
+ }
+
+ public synchronized BSPJob getJobConf() {
+ return localJobConf;
+ }
+
+ public void launchTask() throws IOException {
+ localizeTask(task);
+ taskStatus.setRunState(TaskStatus.State.RUNNING);
+ this.runner = task.createRunner(GroomServer.this);
+ this.runner.start();
+ }
+
+ /**
+ * This task has run on too long, and should be killed.
+ */
+ public synchronized void killAndCleanup(boolean wasFailure)
+ throws IOException {
+ runner.kill();
+ }
+
+ /**
+ */
+ public Task getTask() {
+ return task;
+ }
+
+ /**
+ */
+ public synchronized TaskStatus getStatus() {
+ return taskStatus;
+ }
+
+ /**
+ */
+ public TaskStatus.State getRunState() {
+ return taskStatus.getRunState();
+ }
+
+ public boolean wasKilled() {
+ return wasKilled;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return (obj instanceof TaskInProgress)
+ && task.getTaskID().equals(
+ ((TaskInProgress) obj).getTask().getTaskID());
+ }
+
+ @Override
+ public int hashCode() {
+ return task.getTaskID().hashCode();
+ }
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public static GroomServer constructGroomServer(
+ Class<? extends GroomServer> groomServerClass, final Configuration conf2) {
+ try {
+ Constructor<? extends GroomServer> c = groomServerClass
+ .getConstructor(Configuration.class);
+ return c.newInstance(conf2);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed construction of " + "Master: "
+ + groomServerClass.toString(), e);
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ if (protocol.equals(GroomProtocol.class.getName())) {
+ return GroomProtocol.versionID;
+ } else if (protocol.equals(BSPPeerProtocol.class.getName())) {
+ return BSPPeerProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol to GroomServer: " + protocol);
+ }
+ }
+
+ /**
+ * @return bsp peer information in the form of "address:port".
+ */
+ public String getBspPeerName() {
+ // TODO Later, peers list should be returned.
+ return this.groomHostName + ":" + Constants.DEFAULT_PEER_PORT;
+ }
+
+ /**
+ * The main() for child processes.
+ */
+ public static class Child {
+
+ public static void main(String[] args) throws Throwable {
+ LOG.debug("Child starting");
+
+ HamaConfiguration defaultConf = new HamaConfiguration();
+ // report address
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+ InetSocketAddress address = new InetSocketAddress(host, port);
+ TaskAttemptID taskid = TaskAttemptID.forName(args[2]);
+
+ // //////////////////
+ BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
+ BSPPeerProtocol.class, BSPPeerProtocol.versionID, address,
+ defaultConf);
+
+ Task task = umbilical.getTask(taskid);
+
+ defaultConf.addResource(new Path(task.getJobFile()));
+ BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
+
+ defaultConf.set(Constants.PEER_HOST, args[3]);
+ defaultConf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+
+ BSPPeer bspPeer = new BSPPeer(defaultConf, taskid, umbilical);
+ bspPeer.setJobConf(job);
+ bspPeer.setAllPeerNames(umbilical.getAllPeerNames().getAllPeerNames());
+
+ TaskStatus taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(),
+ 0, TaskStatus.State.RUNNING, "running", host,
+ TaskStatus.Phase.STARTING);
+
+ bspPeer.setCurrentTaskStatus(taskStatus);
+
+ try {
+ // use job-specified working directory
+ FileSystem.get(job.getConf()).setWorkingDirectory(
+ job.getWorkingDirectory());
+
+ task.run(job, bspPeer, umbilical); // run the task
+
+ } catch (FSError e) {
+ LOG.fatal("FSError from child", e);
+ umbilical.fsError(taskid, e.getMessage());
+ } catch (Throwable throwable) {
+ LOG.warn("Error running child", throwable);
+ // Report back any failures, for diagnostic purposes
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ throwable.printStackTrace(new PrintStream(baos));
+ } finally {
+ bspPeer.close(); // close peer.
+
+ RPC.stopProxy(umbilical);
+ // Shutting down log4j of the child-vm...
+ // This assumes that on return from Task.run()
+ // there is no more logging done.
+ LogManager.shutdown();
+ }
+ }
+ }
+
+ @Override
+ public Task getTask(TaskAttemptID taskid) throws IOException {
+ TaskInProgress tip = tasks.get(taskid);
+ if (tip != null) {
+ return tip.getTask();
+ } else {
+ return null;
+ }
+ }
+
+ public void incrementSuperstepCount(TaskAttemptID taskid) throws IOException {
+ TaskInProgress tip = tasks.get(taskid);
+ tip.getStatus().incrementSuperstepCount();
+ }
+
+ @Override
+ public boolean ping(TaskAttemptID taskid) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void done(TaskAttemptID taskid, boolean shouldBePromoted)
+ throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void fsError(TaskAttemptID taskId, String message) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public PeerNames getAllPeerNames() {
+ return allPeerNames;
+ }
+}
\ No newline at end of file
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
+ */
+abstract class GroomServerAction implements Writable {
+
+ /**
+ * Ennumeration of various 'actions' that the {@link BSPMaster}
+ * directs the {@link GroomServer} to perform periodically.
+ *
+ */
+ public static enum ActionType {
+ /** Launch a new task. */
+ LAUNCH_TASK,
+
+ /** Kill a task. */
+ KILL_TASK,
+
+ /** Kill any tasks of this job and cleanup. */
+ KILL_JOB,
+
+ /** Reinitialize the groomserver. */
+ REINIT_GROOM,
+
+ /** Ask a task to save its output. */
+ COMMIT_TASK
+ };
+
+ /**
+ * A factory-method to create objects of given {@link ActionType}.
+ * @param actionType the {@link ActionType} of object to create.
+ * @return an object of {@link ActionType}.
+ */
+ public static GroomServerAction createAction(ActionType actionType) {
+ GroomServerAction action = null;
+
+ switch (actionType) {
+ case LAUNCH_TASK:
+ {
+ action = new LaunchTaskAction();
+ }
+ break;
+ case KILL_TASK:
+ {
+ action = new KillTaskAction();
+ }
+ break;
+ case KILL_JOB:
+ {
+ action = new KillJobAction();
+ }
+ break;
+ case REINIT_GROOM:
+ {
+ action = new ReinitGroomAction();
+ }
+ break;
+ case COMMIT_TASK:
+ {
+ action = new CommitTaskAction();
+ }
+ break;
+ }
+
+ return action;
+ }
+
+ private ActionType actionType;
+
+ protected GroomServerAction(ActionType actionType) {
+ this.actionType = actionType;
+ }
+
+ /**
+ * Return the {@link ActionType}.
+ * @return the {@link ActionType}.
+ */
+ ActionType getActionType() {
+ return actionType;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeEnum(out, actionType);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ actionType = WritableUtils.readEnum(in, ActionType.class);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hama.ipc.GroomProtocol;
+
+/**
+ * Manages information about the {@link GroomServer}s in the cluster
+ * environment. This interface is not intended to be implemented by users.
+ */
+interface GroomServerManager {
+
+ /**
+ * Get the current status of the cluster
+ * @param detailed if true then report groom names as well
+ * @return summary of the state of the cluster
+ */
+ ClusterStatus getClusterStatus(boolean detailed);
+
+ /**
+ * Find WorkerProtocol with corresponded groom server status
+ *
+ * @param groomId The identification value of GroomServer
+ * @return GroomServerStatus
+ */
+ GroomProtocol findGroomServer(GroomServerStatus status);
+
+ /**
+ * Find the collection of groom servers.
+ *
+ * @return Collection of groom servers list.
+ */
+ Collection<GroomProtocol> findGroomServers();
+
+ /**
+ * Collection of GroomServerStatus as the key set.
+ *
+ * @return Collection of GroomServerStatus.
+ */
+ Collection<GroomServerStatus> groomServerStatusKeySet();
+
+ /**
+ * Registers a JobInProgressListener to GroomServerManager. Therefore,
+ * adding a JobInProgress will trigger the jobAdded function.
+ * @param the JobInProgressListener listener to be added.
+ */
+ void addJobInProgressListener(JobInProgressListener listener);
+
+ /**
+ * Unregisters a JobInProgressListener to GroomServerManager. Therefore,
+ * the remove of a JobInProgress will trigger the jobRemoved action.
+ * @param the JobInProgressListener to be removed.
+ */
+ void removeJobInProgressListener(JobInProgressListener listener);
+
+ /**
+ * Current GroomServer Peers.
+ * @return GroomName and PeerName(host:port) in pair.
+ */
+ Map<String, String> currentGroomServerPeers();
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * A GroomServerStatus is a BSP primitive. Keeps info on a BSPMaster. The
+ * BSPMaster maintains a set of the most recent GroomServerStatus objects for
+ * each unique GroomServer it knows about.
+ */
+public class GroomServerStatus implements Writable {
+ public static final Log LOG = LogFactory.getLog(GroomServerStatus.class);
+
+ static {
+ WritableFactories.setFactory(GroomServerStatus.class,
+ new WritableFactory() {
+ public Writable newInstance() {
+ return new GroomServerStatus();
+ }
+ });
+ }
+
+ String groomName;
+ String peerName;
+ String rpcServer;
+ int failures;
+ List<TaskStatus> taskReports;
+
+ volatile long lastSeen;
+ private int maxTasks;
+
+ public GroomServerStatus() {
+ // taskReports = new ArrayList<TaskStatus>();
+ taskReports = new CopyOnWriteArrayList<TaskStatus>();
+ }
+
+ public GroomServerStatus(String groomName, String peerName,
+ List<TaskStatus> taskReports, int failures, int maxTasks) {
+ this(groomName, peerName, taskReports, failures, maxTasks, "");
+ }
+
+ public GroomServerStatus(String groomName, String peerName,
+ List<TaskStatus> taskReports, int failures, int maxTasks, String rpc) {
+ this.groomName = groomName;
+ this.peerName = peerName;
+ this.taskReports = new ArrayList<TaskStatus>(taskReports);
+ this.failures = failures;
+ this.maxTasks = maxTasks;
+ this.rpcServer = rpc;
+ }
+
+ public String getGroomName() {
+ return groomName;
+ }
+
+ /**
+ * The host (and port) from where the groom server can be reached.
+ *
+ * @return The groom server address in the form of "hostname:port"
+ */
+ public String getPeerName() {
+ return peerName;
+ }
+
+ public String getRpcServer() {
+ return rpcServer;
+ }
+
+ /**
+ * Get the current tasks at the GroomServer. Tasks are tracked by a
+ * {@link TaskStatus} object.
+ *
+ * @return a list of {@link TaskStatus} representing the current tasks at the
+ * GroomServer.
+ */
+ public List<TaskStatus> getTaskReports() {
+ return taskReports;
+ }
+
+ public int getFailures() {
+ return failures;
+ }
+
+ public long getLastSeen() {
+ return lastSeen;
+ }
+
+ public void setLastSeen(long lastSeen) {
+ this.lastSeen = lastSeen;
+ }
+
+ public int getMaxTasks() {
+ return maxTasks;
+ }
+
+ /**
+ * Return the current MapTask count
+ */
+ public int countTasks() {
+ int taskCount = 0;
+ for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+ TaskStatus ts = it.next();
+ TaskStatus.State state = ts.getRunState();
+ if (state == TaskStatus.State.RUNNING
+ || state == TaskStatus.State.UNASSIGNED) {
+ taskCount++;
+ }
+ }
+
+ return taskCount;
+ }
+
+ /**
+ * For BSPMaster to distinguish between different GroomServers, because
+ * BSPMaster stores using GroomServerStatus as key.
+ */
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 37 * result + groomName.hashCode();
+ result = 37 * result + peerName.hashCode();
+ result = 37 * result + rpcServer.hashCode();
+ /*
+ * result = 37*result + (int)failures; result = 37*result +
+ * taskReports.hashCode(); result = 37*result +
+ * (int)(lastSeen^(lastSeen>>>32)); result = 37*result + (int)maxTasks;
+ */
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this)
+ return true;
+ if (null == o)
+ return false;
+ if (getClass() != o.getClass())
+ return false;
+
+ GroomServerStatus s = (GroomServerStatus) o;
+ if (!s.groomName.equals(groomName))
+ return false;
+ if (!s.peerName.equals(peerName))
+ return false;
+ if (!s.rpcServer.equals(rpcServer))
+ return false;
+ /*
+ * if(s.failures != failures) return false; if(null == s.taskReports){
+ * if(null != s.taskReports) return false; }else
+ * if(!s.taskReports.equals(taskReports)){ return false; } if(s.lastSeen !=
+ * lastSeen) return false; if(s.maxTasks != maxTasks) return false;
+ */
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.groomName = Text.readString(in);
+ this.peerName = Text.readString(in);
+ this.rpcServer = Text.readString(in);
+ this.failures = in.readInt();
+ this.maxTasks = in.readInt();
+ taskReports.clear();
+ int numTasks = in.readInt();
+
+ TaskStatus status;
+ for (int i = 0; i < numTasks; i++) {
+ status = new TaskStatus();
+ status.readFields(in);
+ taskReports.add(status);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, groomName);
+ Text.writeString(out, peerName);
+ Text.writeString(out, rpcServer);
+ out.writeInt(failures);
+ out.writeInt(maxTasks);
+ out.writeInt(taskReports.size());
+ for (TaskStatus taskStatus : taskReports) {
+ taskStatus.write(out);
+ }
+ }
+
+ public Iterator<TaskStatus> taskReports() {
+ return taskReports.iterator();
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ID.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ID.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ID.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A general identifier, which internally stores the id as an integer. This is
+ * the super class of {@link BSPJobID}, {@link TaskID} and {@link TaskAttemptID}
+ * .
+ */
+public abstract class ID implements WritableComparable<ID> {
+ protected static final char SEPARATOR = '_';
+ protected int id;
+
+ public ID(int id) {
+ this.id = id;
+ }
+
+ protected ID() {
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Integer.valueOf(id).hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null)
+ return false;
+ if (o.getClass() == this.getClass()) {
+ ID that = (ID) o;
+ return this.id == that.id;
+ } else
+ return false;
+ }
+
+ public int compareTo(ID that) {
+ return this.id - that.id;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.id = in.readInt();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerDoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerDoubleMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerDoubleMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerDoubleMessage.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a int tag and a double data.
+ */
+public class IntegerDoubleMessage extends BSPMessage {
+
+ int tag;
+ double data;
+
+ public IntegerDoubleMessage() {
+ super();
+ }
+
+ public IntegerDoubleMessage(int tag, double data) {
+ super();
+ this.tag = tag;
+ this.data = data;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(tag);
+ out.writeDouble(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ tag = in.readInt();
+ data = in.readDouble();
+ }
+
+ @Override
+ public Integer getTag() {
+ return tag;
+ }
+
+ @Override
+ public Double getData() {
+ return data;
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerMessage.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a int data.
+ */
+public class IntegerMessage extends BSPMessage {
+
+ String tag;
+ int data;
+
+ public IntegerMessage() {
+ super();
+ }
+
+ public IntegerMessage(String tag, int data) {
+ super();
+ this.tag = tag;
+ this.data = data;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(tag);
+ out.writeInt(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ tag = in.readUTF();
+ data = in.readInt();
+ }
+
+ @Override
+ public String getTag() {
+ return tag;
+ }
+
+ @Override
+ public Integer getData() {
+ return data;
+ }
+
+}
\ No newline at end of file
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobChangeEvent.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobChangeEvent.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobChangeEvent.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobChangeEvent.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * {@link JobChangeEvent} is used to capture state changes in a job. A job can
+ * change its state w.r.t priority, progress, run-state etc.
+ */
+abstract class JobChangeEvent {
+ private JobInProgress jip;
+
+ JobChangeEvent(JobInProgress jip) {
+ this.jip = jip;
+ }
+
+ /**
+ * Get the job object for which the change is reported
+ */
+ JobInProgress getJobInProgress() {
+ return jip;
+ }
+}