You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2017/08/28 17:20:00 UTC
git commit: updated refs/heads/trunk to cc48935
Repository: giraph
Updated Branches:
refs/heads/trunk 5ed0d65dd -> cc489350e
GIRAPH-1139
closes #30
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/cc489350
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/cc489350
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/cc489350
Branch: refs/heads/trunk
Commit: cc489350eba8db8bc62a4fca77b7ad9aa14bcf3d
Parents: 5ed0d65
Author: Nicholas Eggert <ni...@target.com>
Authored: Mon Aug 28 10:19:34 2017 -0700
Committer: Sergey Edunov <ed...@fb.com>
Committed: Mon Aug 28 10:19:34 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/giraph/bsp/BspService.java | 27 ++++++++++++--------
.../apache/giraph/master/BspServiceMaster.java | 17 ++++++------
.../apache/giraph/worker/BspServiceWorker.java | 10 ++++----
3 files changed, 30 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/cc489350/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 976997f..c3fd141 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -188,12 +188,12 @@ public abstract class BspService<I extends WritableComparable,
private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
/** Job id, to ensure uniqueness */
private final String jobId;
- /** Task partition, to ensure uniqueness */
- private final int taskPartition;
+ /** Task id, from partition and application attempt to ensure uniqueness */
+ private final int taskId;
/** My hostname */
private final String hostname;
- /** Combination of hostname '_' partition (unique id) */
- private final String hostnamePartitionId;
+ /** Combination of hostname '_' task (unique id) */
+ private final String hostnameTaskId;
/** Graph partitioner */
private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
/** Mapper that will do the graph computation */
@@ -231,8 +231,8 @@ public abstract class BspService<I extends WritableComparable,
this.context = context;
this.graphTaskManager = graphTaskManager;
this.conf = graphTaskManager.getConf();
+
this.jobId = conf.getJobId();
- this.taskPartition = conf.getTaskPartition();
this.restartedSuperstep = conf.getLong(
GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
try {
@@ -240,7 +240,6 @@ public abstract class BspService<I extends WritableComparable,
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
- this.hostnamePartitionId = hostname + "_" + getTaskPartition();
this.graphPartitionerFactory = conf.createGraphPartitioner();
basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
@@ -252,6 +251,8 @@ public abstract class BspService<I extends WritableComparable,
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
+
+
String restartJobId = RESTART_JOB_ID.get(conf);
savedCheckpointBasePath =
@@ -272,7 +273,7 @@ public abstract class BspService<I extends WritableComparable,
}
if (LOG.isInfoEnabled()) {
LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
- ", " + getTaskPartition() + " on " + serverPortList);
+ ", partition " + conf.getTaskPartition() + " on " + serverPortList);
}
try {
this.zk = new ZooKeeperExt(serverPortList,
@@ -288,6 +289,10 @@ public abstract class BspService<I extends WritableComparable,
throw new RuntimeException(e);
}
+ this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
+ conf.getTaskPartition();
+ this.hostnameTaskId = hostname + "_" + getTaskId();
+
//Trying to restart from the latest superstep
if (restartJobId != null &&
restartedSuperstep == UNSET_SUPERSTEP) {
@@ -529,12 +534,12 @@ public abstract class BspService<I extends WritableComparable,
return hostname;
}
- public final String getHostnamePartitionId() {
- return hostnamePartitionId;
+ public final String getHostnameTaskId() {
+ return hostnameTaskId;
}
- public final int getTaskPartition() {
- return taskPartition;
+ public final int getTaskId() {
+ return taskId;
}
public final GraphTaskManager<I, V, E> getGraphTaskManager() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/cc489350/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 779bccb..d1dc79d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -579,15 +579,15 @@ public class BspServiceMaster<I extends WritableComparable,
if (LOG.isInfoEnabled()) {
Set<Integer> partitionSet = new TreeSet<Integer>();
for (WorkerInfo workerInfo : healthyWorkerInfoList) {
- partitionSet.add(workerInfo.getTaskId());
+ partitionSet.add(workerInfo.getTaskId() % maxWorkers);
}
for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
- partitionSet.add(workerInfo.getTaskId());
+ partitionSet.add(workerInfo.getTaskId() % maxWorkers);
}
for (int i = 1; i <= maxWorkers; ++i) {
if (partitionSet.contains(Integer.valueOf(i))) {
continue;
- } else if (i == getTaskPartition()) {
+ } else if (i == getTaskId() % maxWorkers) {
continue;
} else {
LOG.info("logMissingWorkersOnSuperstep: No response from " +
@@ -802,7 +802,7 @@ public class BspServiceMaster<I extends WritableComparable,
try {
myBid =
getZkExt().createExt(masterElectionPath +
- "/" + getHostnamePartitionId(),
+ "/" + getHostnameTaskId(),
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,
@@ -841,7 +841,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
if (masterChildArr.get(0).equals(myBid)) {
GiraphStats.getInstance().getCurrentMasterTaskPartition().
- setValue(getTaskPartition());
+ setValue(getTaskId());
globalCommHandler = new MasterGlobalCommHandler(
new MasterAggregatorHandler(getConfiguration(), getContext()),
@@ -860,7 +860,7 @@ public class BspServiceMaster<I extends WritableComparable,
getGraphTaskManager().createUncaughtExceptionHandler());
masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
masterServer.getLocalHostOrIp());
- masterInfo.setTaskId(getTaskPartition());
+ masterInfo.setTaskId(getTaskId());
masterClient =
new NettyMasterClient(getContext(), getConfiguration(), this,
getGraphTaskManager().createUncaughtExceptionHandler());
@@ -1211,6 +1211,7 @@ public class BspServiceMaster<I extends WritableComparable,
setApplicationAttempt(getApplicationAttempt() + 1);
setCachedSuperstep(checkpoint);
setRestartedSuperstep(checkpoint);
+ checkpointStatus = CheckpointStatus.NONE;
setJobState(ApplicationState.START_SUPERSTEP,
getApplicationAttempt(),
checkpoint);
@@ -1740,7 +1741,7 @@ public class BspServiceMaster<I extends WritableComparable,
if (checkpointFrequency == 0) {
return CheckpointStatus.NONE;
}
- long firstCheckpoint = INPUT_SUPERSTEP + 1 + checkpointFrequency;
+ long firstCheckpoint = INPUT_SUPERSTEP + 1;
if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
}
@@ -1912,7 +1913,7 @@ public class BspServiceMaster<I extends WritableComparable,
// for workers and masters, the master will clean up the ZooKeeper
// znodes associated with this job.
String masterCleanedUpPath = cleanedUpPath + "/" +
- getTaskPartition() + MASTER_SUFFIX;
+ getTaskId() + MASTER_SUFFIX;
try {
String finalFinishedPath =
getZkExt().createExt(masterCleanedUpPath,
http://git-wip-us.apache.org/repos/asf/giraph/blob/cc489350/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index b6b9c12..6f02749 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -216,7 +216,7 @@ public class BspServiceWorker<I extends WritableComparable,
graphTaskManager.createUncaughtExceptionHandler());
workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
workerServer.getLocalHostOrIp());
- workerInfo.setTaskId(getTaskPartition());
+ workerInfo.setTaskId(getTaskId());
workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
graphTaskManager.createUncaughtExceptionHandler());
workerServer.setFlowControl(workerClient.getFlowControl());
@@ -243,7 +243,7 @@ public class BspServiceWorker<I extends WritableComparable,
}
observers = conf.createWorkerObservers(context);
- WorkerProgress.get().setTaskId(getTaskPartition());
+ WorkerProgress.get().setTaskId(getTaskId());
workerProgressWriter = conf.trackJobProgressOnClient() ?
new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
null;
@@ -921,7 +921,7 @@ else[HADOOP_NON_SECURE]*/
String finishedWorkerPath =
getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
- "/" + getHostnamePartitionId();
+ "/" + workerInfo.getHostnameId();
try {
getZkExt().createExt(finishedWorkerPath,
workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
@@ -1202,7 +1202,7 @@ else[HADOOP_NON_SECURE]*/
// for workers and masters, the master will clean up the ZooKeeper
// znodes associated with this job.
String workerCleanedUpPath = cleanedUpPath + "/" +
- getTaskPartition() + WORKER_SUFFIX;
+ getTaskId() + WORKER_SUFFIX;
try {
String finalFinishedPath =
getZkExt().createExt(workerCleanedUpPath,
@@ -1303,7 +1303,7 @@ else[HADOOP_NON_SECURE]*/
// Notify master that checkpoint is stored
String workerWroteCheckpoint =
getWorkerWroteCheckpointPath(getApplicationAttempt(),
- getSuperstep()) + "/" + getHostnamePartitionId();
+ getSuperstep()) + "/" + workerInfo.getHostnameId();
try {
getZkExt().createExt(workerWroteCheckpoint,
new byte[0],