You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/28 05:47:19 UTC
svn commit: r1097314 - in /incubator/hama/trunk: ./ conf/
src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/
src/test/org/apache/hama/ src/test/org/apache/hama/bsp/ src/test/testjar/
Author: edwardyoon
Date: Thu Apr 28 03:47:18 2011
New Revision: 1097314
URL: http://svn.apache.org/viewvc?rev=1097314&view=rev
Log:
Refactor BSPMaster and GroomServer
Added:
incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java
incubator/hama/trunk/src/test/testjar/
incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java
incubator/hama/trunk/src/test/testjar/testjob.jar (with props)
Removed:
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/conf/hama-default.xml
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java
incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Apr 28 03:47:18 2011
@@ -10,6 +10,7 @@ Release 0.3 - Unreleased
IMPROVEMENTS
+ HAMA-376: Refactor BSPMaster and GroomServer (ChiaHung Lin via edwardyoon)
HAMA-382: Refactor HAMA POM (Tommaso Teofili)
HAMA-380: Send messages in batches to reduce RPC overhead (Miklos Erdelyi via edwardyoon)
HAMA-362: Re-design a new data structure of BSPMessage (Thomas Jungblut via edwardyoon)
Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Thu Apr 28 03:47:18 2011
@@ -24,7 +24,7 @@
<configuration>
<property>
<name>bsp.master.address</name>
- <value>localhost</value>
+ <value>local</value>
<description>The address of the bsp master server. Either the
literal string "local" or a host[:port] (where host is a name or
IP address) for distributed mode.
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Thu Apr 28 03:47:18 2011
@@ -28,6 +28,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,7 +54,7 @@ import org.apache.hama.ipc.WorkerProtoco
* BSPMaster is responsible to control all the groom servers and to manage bsp
* jobs.
*/
-public class BSPMaster implements JobSubmissionProtocol, MasterProtocol, // InterServerProtocol,
+public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
GroomServerManager {
public static final Log LOG = LogFactory.getLog(BSPMaster.class);
@@ -98,9 +100,114 @@ public class BSPMaster implements JobSub
private TaskScheduler taskScheduler;
// GroomServers cache
- protected ConcurrentMap<GroomServerStatus, WorkerProtocol> groomServers = new ConcurrentHashMap<GroomServerStatus, WorkerProtocol>();
+ protected ConcurrentMap<GroomServerStatus, WorkerProtocol> groomServers =
+ new ConcurrentHashMap<GroomServerStatus, WorkerProtocol>();
- private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();
+ private Instructor instructor;
+
+ private final List<JobInProgressListener> jobInProgressListeners =
+ new CopyOnWriteArrayList<JobInProgressListener>();
+
+ private class ReportGroomStatusHandler implements DirectiveHandler{
+
+ public void handle(Directive directive) throws DirectiveException{
+ // update GroomServerStatus held in the groomServers cache.
+ GroomServerStatus groomStatus =
+ ((ReportGroomStatusDirective)directive).getStatus();
+ // groomServers cache contains groom server status reported back
+ if (groomServers.containsKey(groomStatus)) {
+ GroomServerStatus tmpStatus = null;
+ for (GroomServerStatus old : groomServers.keySet()) {
+ if (old.equals(groomStatus)) {
+ tmpStatus = groomStatus;
+ updateGroomServersKey(old, tmpStatus);
+ break;
+ }
+ }// for
+ if (null != tmpStatus) {
+ List<TaskStatus> tlist = tmpStatus.getTaskReports();
+ for (TaskStatus ts : tlist) {
+ JobInProgress jip = whichJob(ts.getJobId());
+ TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
+ .getTaskId()).getTaskID());
+
+ if(ts.getRunState() == TaskStatus.State.SUCCEEDED) {
+ jip.completedTask(tip, ts);
+ } else if (ts.getRunState() == TaskStatus.State.RUNNING) {
+ // do nothing
+ } else if (ts.getRunState() == TaskStatus.State.FAILED) {
+ jip.status.setRunState(JobStatus.FAILED);
+ jip.failedTask(tip, ts);
+ }
+ if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+ for(JobInProgressListener listener: jobInProgressListeners){
+ try {
+ listener.jobRemoved(jip);
+ } catch (IOException ioe) {
+ LOG.error("Fail to alter scheduler a job is moved.", ioe);
+ }
+ }
+ } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
+ jip.getStatus().setprogress(ts.getSuperstepCount());
+ } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
+ WorkerProtocol worker = findGroomServer(tmpStatus);
+ Directive d1 = new DispatchTasksDirective(currentGroomServerPeers(),
+ new GroomServerAction[] {new KillTaskAction(ts.getTaskId()) });
+ try{
+ worker.dispatch(d1);
+ }catch(IOException ioe){
+ throw new DirectiveException("Error when dispatching kill task"+
+ " action.", ioe);
+ }
+ }
+ }
+ } else {
+ throw new RuntimeException("BSPMaster contains GroomServerSatus, "
+ + "but fail to retrieve it.");
+ }
+ } else {
+ throw new RuntimeException("GroomServer not found." +
+ groomStatus.getGroomName());
+ }
+ }
+ }
+
+ private class Instructor extends Thread{
+ private final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
+ private final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers =
+ new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
+
+ public void bind(Class<? extends Directive> instruction,
+ DirectiveHandler handler){
+ handlers.putIfAbsent(instruction, handler);
+ }
+
+ public void put(Directive directive){
+ try{
+ buffer.put(directive);
+ }catch(InterruptedException ie){
+ LOG.error("Fail to put directive into queue.", ie);
+ }
+ }
+
+ public void run(){
+ while(true){
+ try{
+ Directive directive = this.buffer.take();
+ if(directive instanceof ReportGroomStatusDirective){
+ ((DirectiveHandler)handlers.get(ReportGroomStatusDirective.class)).handle(directive);
+ }else{
+ throw new RuntimeException("Directive is not supported."+directive);
+ }
+ }catch(InterruptedException ie){
+ LOG.error("Unable to retrieve directive from the queue.", ie);
+ Thread.currentThread().interrupt();
+ }catch(Exception e){
+ LOG.error("Fail to execute directive command.", e);
+ }
+ }
+ }
+ }
/**
* Start the BSPMaster process, listen on the indicated hostname/port
@@ -114,7 +221,6 @@ public class BSPMaster implements JobSub
InterruptedException {
this.conf = conf;
this.masterIdentifier = identifier;
- // expireLaunchingTaskThread.start();
// Create the scheduler and init scheduler services
Class<? extends TaskScheduler> schedulerClass = conf.getClass(
@@ -228,72 +334,7 @@ public class BSPMaster implements JobSub
@Override
public boolean report(Directive directive) throws IOException {
- // check returned directive type if equals response
- if (directive.getType().value() != Directive.Type.Response.value()) {
- throw new IllegalStateException("GroomServer should report()"
- + " with Response. Current report type:" + directive.getType());
- }
- // update GroomServerStatus hold in groomServers cache.
- GroomServerStatus fstus = directive.getStatus();
-
- // groomServers cache contains groom server status reported back
- if (groomServers.containsKey(fstus)) {
- GroomServerStatus ustus = null;
- for (GroomServerStatus old : groomServers.keySet()) {
- if (old.equals(fstus)) {
- ustus = fstus;
- updateGroomServersKey(old, ustus);
- break;
- }
- }// for
-
- if (null != ustus) {
- List<TaskStatus> tlist = ustus.getTaskReports();
- for (TaskStatus ts : tlist) {
- JobInProgress jip = whichJob(ts.getJobId());
-
- TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
- .getTaskId()).getTaskID());
-
- if(ts.getRunState() == TaskStatus.State.SUCCEEDED) {
- jip.completedTask(tip, ts);
- } else if (ts.getRunState() == TaskStatus.State.RUNNING) {
- // do nothing
- } else if (ts.getRunState() == TaskStatus.State.FAILED) {
- jip.status.setRunState(JobStatus.FAILED);
- jip.failedTask(tip, ts);
- }
-
- if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
- for (JobInProgressListener listener : jobInProgressListeners) {
- try {
- listener.jobRemoved(jip);
- } catch (IOException ioe) {
- LOG.error("Fail to alter scheduler a job is moved.", ioe);
- }
- }
-
- } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
- jip.getStatus().setprogress(ts.getSuperstepCount());
- } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
-
- WorkerProtocol worker = findGroomServer(ustus);
- Directive d1 = new Directive(currentGroomServerPeers(),
- new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
-
- worker.dispatch(d1);
-
- }
-
- }
- } else {
- throw new RuntimeException("BSPMaster contains GroomServerSatus, "
- + "but fail to retrieve it.");
- }
- } else {
- throw new RuntimeException("GroomServer not found."
- + fstus.getGroomName());
- }
+ instructor.put(directive);
return true;
}
@@ -378,15 +419,20 @@ public class BSPMaster implements JobSub
}
public void offerService() throws InterruptedException, IOException {
- // this.interServer.start();
+
this.masterServer.start();
synchronized (this) {
state = State.RUNNING;
}
+
+ instructor = new Instructor();
+ instructor.bind(ReportGroomStatusDirective.class,
+ new ReportGroomStatusHandler());
+ instructor.start();
+
LOG.info("Starting RUNNING");
- // this.interServer.join();
this.masterServer.join();
LOG.info("Stopped RPC Master server.");
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java Thu Apr 28 03:47:18 2011
@@ -34,15 +34,10 @@ import org.apache.hadoop.io.WritableUtil
* A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
* {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
*/
-public class Directive implements Writable {
+public class Directive implements Writable{
- public static final Log LOG = LogFactory.getLog(Directive.class);
-
- private long timestamp;
- private Directive.Type type;
- private Map<String, String> groomServerPeers;
- private GroomServerAction[] actions;
- private GroomServerStatus status;
+ protected long timestamp;
+ protected Directive.Type type;
public static enum Type {
Request(1), Response(2);
@@ -57,22 +52,11 @@ public class Directive implements Writab
}
};
- public Directive() {
- this.timestamp = System.currentTimeMillis();
- }
+ public Directive(){}
- public Directive(Map<String, String> groomServerPeers,
- GroomServerAction[] actions) {
- this();
- this.type = Directive.Type.Request;
- this.groomServerPeers = groomServerPeers;
- this.actions = actions;
- }
-
- public Directive(GroomServerStatus status) {
- this();
- this.type = Directive.Type.Response;
- this.status = status;
+ public Directive(Directive.Type type) {
+ this.timestamp = System.currentTimeMillis();
+ this.type = type;
}
public long getTimestamp() {
@@ -83,48 +67,14 @@ public class Directive implements Writab
return this.type;
}
- public Map<String, String> getGroomServerPeers() {
- return this.groomServerPeers;
- }
-
- public GroomServerAction[] getActions() {
- return this.actions;
- }
-
- public GroomServerStatus getStatus() {
- return this.status;
- }
+ /**
+ * Command for BSPMaster or GroomServer to execute.
+ public abstract void execute() throws Exception;
+ */
public void write(DataOutput out) throws IOException {
out.writeLong(this.timestamp);
out.writeInt(this.type.value());
- if (getType().value() == Directive.Type.Request.value()) {
- if (this.actions == null) {
- WritableUtils.writeVInt(out, 0);
- } else {
- WritableUtils.writeVInt(out, actions.length);
- for (GroomServerAction action : this.actions) {
- WritableUtils.writeEnum(out, action.getActionType());
- action.write(out);
- }
- }
- String[] groomServerNames = groomServerPeers.keySet().toArray(
- new String[0]);
- WritableUtils.writeCompressedStringArray(out, groomServerNames);
-
- List<String> groomServerAddresses = new ArrayList<String>(
- groomServerNames.length);
- for (String groomName : groomServerNames) {
- groomServerAddresses.add(groomServerPeers.get(groomName));
- }
- WritableUtils.writeCompressedStringArray(out, groomServerAddresses
- .toArray(new String[0]));
- } else if (getType().value() == Directive.Type.Response.value()) {
- this.status.write(out);
- } else {
- throw new IllegalStateException("Wrong directive type:" + getType());
- }
-
}
public void readFields(DataInput in) throws IOException {
@@ -132,32 +82,8 @@ public class Directive implements Writab
int t = in.readInt();
if (Directive.Type.Request.value() == t) {
this.type = Directive.Type.Request;
- int length = WritableUtils.readVInt(in);
- if (length > 0) {
- this.actions = new GroomServerAction[length];
- for (int i = 0; i < length; ++i) {
- GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
- GroomServerAction.ActionType.class);
- actions[i] = GroomServerAction.createAction(actionType);
- actions[i].readFields(in);
- }
- } else {
- this.actions = null;
- }
- String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
- String[] groomServerAddresses = WritableUtils
- .readCompressedStringArray(in);
- groomServerPeers = new HashMap<String, String>(groomServerNames.length);
-
- for (int i = 0; i < groomServerNames.length; i++) {
- groomServerPeers.put(groomServerNames[i], groomServerAddresses[i]);
- }
- } else if (Directive.Type.Response.value() == t) {
+ }else{
this.type = Directive.Type.Response;
- this.status = new GroomServerStatus();
- this.status.readFields(in);
- } else {
- throw new IllegalStateException("Wrong directive type:" + t);
}
}
}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+public class DirectiveException extends RuntimeException{
+
+ public DirectiveException(){
+ super();
+ }
+
+ public DirectiveException(String message){
+ super(message);
+ }
+
+ public DirectiveException(String message, Throwable t){
+ super(message, t);
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+public interface DirectiveHandler{
+
+ /**
+ * Handle directives on demand.
+ * @param directive to be handled.
+ */
+ void handle(Directive directive) throws DirectiveException;
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public final class DispatchTasksDirective extends Directive implements Writable {
+
+ public static final Log LOG = LogFactory.getLog(DispatchTasksDirective.class);
+
+ private Map<String, String> groomServerPeers;
+ private GroomServerAction[] actions;
+
+ public DispatchTasksDirective(){ super(); }
+
+ public DispatchTasksDirective(Map<String, String> groomServerPeers,
+ GroomServerAction[] actions) {
+ super(Directive.Type.Request);
+ this.groomServerPeers = groomServerPeers;
+ this.actions = actions;
+ }
+
+ public Map<String, String> getGroomServerPeers() {
+ return this.groomServerPeers;
+ }
+
+ public GroomServerAction[] getActions() {
+ return this.actions;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ if (this.actions == null) {
+ WritableUtils.writeVInt(out, 0);
+ } else {
+ WritableUtils.writeVInt(out, actions.length);
+ for (GroomServerAction action : this.actions) {
+ WritableUtils.writeEnum(out, action.getActionType());
+ action.write(out);
+ }
+ }
+ String[] groomServerNames = groomServerPeers.keySet().toArray(
+ new String[0]);
+ WritableUtils.writeCompressedStringArray(out, groomServerNames);
+
+ List<String> groomServerAddresses = new ArrayList<String>(
+ groomServerNames.length);
+ for (String groomName : groomServerNames) {
+ groomServerAddresses.add(groomServerPeers.get(groomName));
+ }
+ WritableUtils.writeCompressedStringArray(out, groomServerAddresses
+ .toArray(new String[0]));
+
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ int length = WritableUtils.readVInt(in);
+ if (length > 0) {
+ this.actions = new GroomServerAction[length];
+ for (int i = 0; i < length; ++i) {
+ GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
+ GroomServerAction.ActionType.class);
+ actions[i] = GroomServerAction.createAction(actionType);
+ actions[i].readFields(in);
+ }
+ } else {
+ this.actions = null;
+ }
+ String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
+ String[] groomServerAddresses = WritableUtils
+ .readCompressedStringArray(in);
+ groomServerPeers = new HashMap<String, String>(groomServerNames.length);
+
+ for (int i = 0; i < groomServerNames.length; i++) {
+ groomServerPeers.put(groomServerNames[i], groomServerAddresses[i]);
+ }
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Thu Apr 28 03:47:18 2011
@@ -32,6 +32,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
@@ -44,8 +46,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DiskChecker;
@@ -93,6 +93,7 @@ public class GroomServer implements Runn
String groomServerName;
String localHostname;
InetSocketAddress bspMasterAddr;
+ private Instructor instructor;
// Filesystem
// private LocalDirAllocator localDirAllocator;
@@ -122,15 +123,89 @@ public class GroomServer implements Runn
private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
+ private class DispatchTasksHandler implements DirectiveHandler {
+
+ public void handle(Directive directive) throws DirectiveException {
+ GroomServerAction[] actions = ((DispatchTasksDirective) directive)
+ .getActions();
+ synchronized (bspPeer) {
+ bspPeer.setAllPeerNames(((DispatchTasksDirective) directive)
+ .getGroomServerPeers().values());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got Response from BSPMaster with "
+ + ((actions != null) ? actions.length : 0) + " actions");
+ }
+ if (actions != null) {
+ for (GroomServerAction action : actions) {
+ if (action instanceof LaunchTaskAction) {
+ startNewTask((LaunchTaskAction) action);
+ } else {
+
+ // TODO Use the cleanup thread
+ // tasksToCleanup.put(action);
+
+ KillTaskAction killAction = (KillTaskAction) action;
+ if (tasks.containsKey(killAction.getTaskID())) {
+ TaskInProgress tip = tasks.get(killAction.getTaskID());
+ tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+ try {
+ tip.killAndCleanup(true);
+ } catch (IOException ioe) {
+ throw new DirectiveException("Error when killing a "
+ + "TaskInProgress.", ioe);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private class Instructor extends Thread {
+ final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
+ final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
+
+ public void bind(Class<? extends Directive> instruction,
+ DirectiveHandler handler) {
+ handlers.putIfAbsent(instruction, handler);
+ }
+
+ public void put(Directive directive) {
+ try {
+ buffer.put(directive);
+ } catch (InterruptedException ie) {
+ LOG.error("Unable to put directive into queue.", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ Directive directive = buffer.take();
+ if (directive instanceof DispatchTasksDirective) {
+ ((DirectiveHandler) handlers.get(DispatchTasksDirective.class))
+ .handle(directive);
+ } else {
+ throw new RuntimeException("Directive is not supported."
+ + directive);
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Unable to retrieve directive from the queue.", ie);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOG.error("Fail to execute directive.", e);
+ }
+ }
+ }
+ }
+
public GroomServer(Configuration conf) throws IOException {
LOG.info("groom start");
this.conf = conf;
- String mode = conf.get("bsp.master.address");
- if (!mode.equals("local")) {
- bspMasterAddr = BSPMaster.getAddress(conf);
- }
-
+ bspMasterAddr = BSPMaster.getAddress(conf);
// FileSystem local = FileSystem.getLocal(conf);
// this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
}
@@ -141,9 +216,8 @@ public class GroomServer implements Runn
}
if (localHostname == null) {
- this.localHostname = DNS.getDefaultHost(
- conf.get("bsp.dns.interface", "default"),
- conf.get("bsp.dns.nameserver", "default"));
+ this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
+ "default"), conf.get("bsp.dns.nameserver", "default"));
}
// check local disk
checkLocalDirs(conf.getStrings("bsp.local.dir"));
@@ -217,6 +291,11 @@ public class GroomServer implements Runn
throw new IOException("There is a problem in establishing"
+ " communication link with BSPMaster.");
}
+
+ this.instructor = new Instructor();
+ this.instructor.bind(DispatchTasksDirective.class,
+ new DispatchTasksHandler());
+ instructor.start();
this.running = true;
this.initialized = true;
}
@@ -228,31 +307,10 @@ public class GroomServer implements Runn
@Override
public void dispatch(Directive directive) throws IOException {
- // update tasks status
- GroomServerAction[] actions = directive.getActions();
- bspPeer.setAllPeerNames(directive.getGroomServerPeers().values());
- LOG.debug("Got Response from BSPMaster with "
- + ((actions != null) ? actions.length : 0) + " actions");
- // perform actions
- if (actions != null) {
- for (GroomServerAction action : actions) {
- if (action instanceof LaunchTaskAction) {
- startNewTask((LaunchTaskAction) action);
- } else {
-
- // TODO Use the cleanup thread
- // tasksToCleanup.put(action);
-
- KillTaskAction killAction = (KillTaskAction) action;
- if (tasks.containsKey(killAction.getTaskID())) {
- TaskInProgress tip = tasks.get(killAction.getTaskID());
- tip.taskStatus.setRunState(TaskStatus.State.FAILED);
- tip.killAndCleanup(true);
- }
+ if (!instructor.isAlive())
+ throw new IOException();
- }
- }
- }
+ instructor.put(directive);
}
private static void checkLocalDirs(String[] localDirs)
@@ -392,15 +450,16 @@ public class GroomServer implements Runn
* Update and report refresh status back to BSPMaster.
*/
public void doReport(TaskStatus taskStatus) {
- GroomServerStatus gss = new GroomServerStatus(groomServerName,
+ GroomServerStatus groomStatus = new GroomServerStatus(groomServerName,
bspPeer.getPeerName(), updateTaskStatus(taskStatus), failures,
maxCurrentTasks, rpcServer);
try {
- boolean ret = masterClient.report(new Directive(gss));
+ boolean ret = masterClient.report(new ReportGroomStatusDirective(
+ groomStatus));
if (!ret) {
LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
- + " groom name: " + gss.getGroomName() + " peer name:"
- + gss.getPeerName() + " rpc server:" + rpcServer);
+ + " groom name: " + groomStatus.getGroomName() + " peer name:"
+ + groomStatus.getPeerName() + " rpc server:" + rpcServer);
}
} catch (IOException ioe) {
LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
@@ -798,8 +857,6 @@ public class GroomServer implements Runn
throwable.printStackTrace(new PrintStream(baos));
} finally {
RPC.stopProxy(umbilical);
- MetricsContext metricsContext = MetricsUtil.getContext("mapred");
- metricsContext.close();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.ipc.WorkerProtocol;
+
+public class ReportGroomStatusDirective extends Directive implements Writable {
+
+ public static final Log LOG = LogFactory.getLog(ReportGroomStatusDirective.class);
+
+ private GroomServerStatus status;
+
+ public ReportGroomStatusDirective(){ super(); }
+
+ public ReportGroomStatusDirective(GroomServerStatus status) {
+ super(Directive.Type.Response);
+ this.status = status;
+ }
+
+ public GroomServerStatus getStatus() {
+ return this.status;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ this.status.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ this.status = new GroomServerStatus();
+ this.status.readFields(in);
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Thu Apr 28 03:47:18 2011
@@ -136,9 +136,9 @@ class SimpleTaskScheduler extends TaskSc
WorkerProtocol worker = groomServerManager.findGroomServer(this.stus);
try {
// dispatch() to the groom server
- Directive d1 = new Directive(groomServerManager
- .currentGroomServerPeers(),
- new GroomServerAction[] { new LaunchTaskAction(t) });
+ Directive d1 = new DispatchTasksDirective(groomServerManager
+ .currentGroomServerPeers(), new GroomServerAction[] {
+ new LaunchTaskAction(t)});
worker.dispatch(d1);
} catch (IOException ioe) {
LOG.error("Fail to dispatch tasks to GroomServer "
Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java Thu Apr 28 03:47:18 2011
@@ -22,8 +22,7 @@ import java.io.IOException;
import org.apache.hama.bsp.Directive;
/**
- * A protocol for BSPMaster talks to GroomServer. This protocol
- * allow BSPMaster dispatch tasks to a GroomServer.
+ * A protocol for BSPMaster talks to GroomServer.
*/
public interface WorkerProtocol extends HamaRPCProtocolVersion {
Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java Thu Apr 28 03:47:18 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
public abstract class HamaClusterTestCase extends HamaTestCase {
public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class);
protected MiniDFSCluster dfsCluster;
+ protected MiniBSPCluster bspCluster;
protected MiniZooKeeperCluster zooKeeperCluster;
protected boolean startDfs;
@@ -52,6 +53,8 @@ public abstract class HamaClusterTestCas
this.zooKeeperCluster = new MiniZooKeeperCluster();
int clientPort = this.zooKeeperCluster.startup(testDir);
conf.set("hama.zookeeper.property.clientPort", Integer.toString(clientPort));
+ bspCluster = new MiniBSPCluster(this.conf, 2);
+ bspCluster.startBSPCluster();
}
@Override
@@ -95,6 +98,7 @@ public abstract class HamaClusterTestCas
if (startDfs) {
shutdownDfs(dfsCluster);
}
+ bspCluster.shutdown();
} catch (Exception e) {
LOG.error(e);
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java Thu Apr 28 03:47:18 2011
@@ -62,6 +62,10 @@ public abstract class HamaTestCase exten
private void init() {
conf = new HamaConfiguration();
+ conf.setStrings("bsp.local.dir", "/tmp/hama-test");
+ conf.set("bsp.master.address", "localhost");
+ conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+ conf.set("bsp.groom.report.address", "127.0.0.1:0");
}
/**
Modified: incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java Thu Apr 28 03:47:18 2011
@@ -135,11 +135,24 @@ public class MiniBSPCluster {
int threadpool = conf.getInt("bsp.test.threadpool", 10);
LOG.info("Thread pool value "+threadpool);
scheduler = Executors.newScheduledThreadPool(threadpool);
+ }
+ public void startBSPCluster(){
startMaster();
startGroomServers();
}
+ public void shutdownBSPCluster(){
+ if(null != this.master && this.master.isRunning())
+ this.master.shutdown();
+ if(0 < groomServerList.size()){
+ for(GroomServerRunner groom: groomServerList){
+ if(groom.isRunning()) groom.shutdown();
+ }
+ }
+ }
+
+
public void startMaster(){
if(null == this.scheduler)
throw new NullPointerException("No ScheduledExecutorService exists.");
@@ -203,6 +216,7 @@ public class MiniBSPCluster {
}
public void shutdown() {
+ shutdownBSPCluster();
scheduler.shutdown();
}
Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+public class TestBSPMasterGroomServer extends HamaCluster {
+
+ private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
+ private static String TMP_OUTPUT = "/tmp/test-example/";
+ private HamaConfiguration configuration;
+ private String TEST_JOB = "src/test/testjar/testjob.jar";
+
+ public TestBSPMasterGroomServer() {
+ configuration = new HamaConfiguration();
+ configuration.set("bsp.master.address", "localhost");
+ assertEquals("Make sure master addr is set to localhost:", "localhost",
+ configuration.get("bsp.master.address"));
+ configuration.setStrings("bsp.local.dir", "/tmp/hama-test");
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ public void testSubmitJob() throws Exception {
+ BSPJob bsp = new BSPJob(configuration);
+ bsp.setJobName("Test Serialize Printing");
+ bsp.setBspClass(testjar.ClassSerializePrinting.HelloBSP.class);
+ bsp.setJar(TEST_JOB);
+
+ // Set the task size as a number of GroomServer
+ BSPJobClient jobClient = new BSPJobClient(configuration);
+ ClusterStatus cluster = jobClient.getClusterStatus(false);
+ bsp.setNumBspTask(cluster.getGroomServers());
+ FileSystem fileSys = FileSystem.get(conf);
+
+ if (bsp.waitForCompletion(true)) {
+ checkOutput(fileSys, cluster, conf);
+ }
+ LOG.info("Client finishes execution job.");
+ }
+
+ private static void checkOutput(FileSystem fileSys, ClusterStatus cluster,
+ HamaConfiguration conf) throws Exception {
+ for (int i = 0; i < cluster.getGroomServers(); i++) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+ TMP_OUTPUT + i), conf);
+ LongWritable timestamp = new LongWritable();
+ Text message = new Text();
+ reader.next(timestamp, message);
+ assertTrue("Check if `Hello BSP' gets printed.", message.toString()
+ .indexOf("Hello BSP from") >= 0);
+ reader.close();
+ }
+ }
+
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+}
Added: incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java (added)
+++ incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package testjar;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.zookeeper.KeeperException;
+
+public class ClassSerializePrinting {
+ private static String TMP_OUTPUT = "/tmp/test-example/";
+
+ public static class HelloBSP extends BSP {
+ public static final Log LOG = LogFactory.getLog(HelloBSP.class);
+ private Configuration conf;
+ private final static int PRINT_INTERVAL = 1000;
+ private FileSystem fileSys;
+ private int num;
+
+ public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+ KeeperException, InterruptedException {
+
+ int i = 0;
+ for (String otherPeer : bspPeer.getAllPeerNames()) {
+ String peerName = bspPeer.getPeerName();
+ if (peerName.equals(otherPeer)) {
+ writeLogToFile(peerName, i);
+ }
+
+ Thread.sleep(PRINT_INTERVAL);
+ bspPeer.sync();
+ i++;
+ }
+ }
+
+ private void writeLogToFile(String string, int i) throws IOException {
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+ CompressionType.NONE);
+ writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+ "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
+ writer.close();
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ num = Integer.parseInt(conf.get("bsp.peers.num"));
+ try {
+ fileSys = FileSystem.get(conf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+}
Added: incubator/hama/trunk/src/test/testjar/testjob.jar
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/testjar/testjob.jar?rev=1097314&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/hama/trunk/src/test/testjar/testjob.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream