You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:11 UTC
[28/47] git commit: updated refs/heads/release-1.1 to 4c139ee
GIRAPH-933: Checkpointing improvements (edunov via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5adca63d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5adca63d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5adca63d
Branch: refs/heads/release-1.1
Commit: 5adca63deca25d84f4fdea053c35a85efc8bbb3d
Parents: bc9f823
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Aug 15 15:03:19 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Aug 15 15:03:19 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/bsp/BspService.java | 108 ++++++----
.../apache/giraph/bsp/CentralizedService.java | 9 -
.../giraph/bsp/CentralizedServiceMaster.java | 4 +-
.../giraph/bsp/CentralizedServiceWorker.java | 8 +
.../org/apache/giraph/bsp/CheckpointStatus.java | 31 +++
.../org/apache/giraph/bsp/SuperstepState.java | 30 ++-
.../java/org/apache/giraph/comm/ServerData.java | 10 +
.../org/apache/giraph/conf/GiraphConstants.java | 10 +-
.../giraph/graph/FinishedSuperstepStats.java | 20 +-
.../org/apache/giraph/graph/GlobalStats.java | 27 ++-
.../apache/giraph/graph/GraphTaskManager.java | 44 ++--
.../job/DefaultGiraphJobRetryChecker.java | 5 +
.../java/org/apache/giraph/job/GiraphJob.java | 23 ++
.../giraph/job/GiraphJobRetryChecker.java | 6 +
.../java/org/apache/giraph/job/HadoopUtils.java | 15 ++
.../apache/giraph/master/BspServiceMaster.java | 147 +++++++++----
.../org/apache/giraph/master/MasterThread.java | 10 +-
.../apache/giraph/utils/CheckpointingUtils.java | 62 ++++++
.../org/apache/giraph/utils/WritableUtils.java | 63 ++++++
.../apache/giraph/worker/BspServiceWorker.java | 47 ++++-
.../apache/giraph/utils/TestWritableUtils.java | 70 +++++++
.../org/apache/giraph/TestCheckpointing.java | 208 +++++++++++++++----
pom.xml | 2 +-
24 files changed, 787 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 66136b2..b64ce2c 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-933: Checkpointing improvements (edunov via majakabiljo)
+
GIRAPH-943: Perf regression due to netty 4.0.21 (pavanka)
GIRAPH-935: Loosen modifiers when needed (ikabiljo via majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 02577b9..c418a89 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -24,13 +24,16 @@ import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.InputSplitEvents;
import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.partition.GraphPartitionerFactory;
+import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.giraph.zk.ZooKeeperManager;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -50,10 +53,10 @@ import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.security.InvalidParameterException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID;
/**
@@ -162,6 +165,8 @@ public abstract class BspService<I extends WritableComparable,
public static final String WORKER_PROGRESSES = "/_workerProgresses";
/** Denotes that computation should be halted */
public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
+ /** User sets this flag to checkpoint and stop the job */
+ public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop";
/** Denotes which workers have been cleaned up */
public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
/** JSON partition stats key */
@@ -283,8 +288,6 @@ public abstract class BspService<I extends WritableComparable,
private final GraphTaskManager<I, V, E> graphTaskManager;
/** File system */
private final FileSystem fs;
- /** Checkpoint frequency */
- private final int checkpointFrequency;
/**
* Constructor.
@@ -325,13 +328,6 @@ public abstract class BspService<I extends WritableComparable,
this.taskPartition = conf.getTaskPartition();
this.restartedSuperstep = conf.getLong(
GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
- this.cachedSuperstep = restartedSuperstep;
- if ((restartedSuperstep != UNSET_SUPERSTEP) &&
- (restartedSuperstep < 0)) {
- throw new IllegalArgumentException(
- "BspService: Invalid superstep to restart - " +
- restartedSuperstep);
- }
try {
this.hostname = conf.getLocalHostname();
} catch (UnknownHostException e) {
@@ -340,8 +336,6 @@ public abstract class BspService<I extends WritableComparable,
this.hostnamePartitionId = hostname + "_" + getTaskPartition();
this.graphPartitionerFactory = conf.createGraphPartitioner();
- this.checkpointFrequency = conf.getCheckpointFrequency();
-
basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
basePath);
@@ -360,13 +354,14 @@ public abstract class BspService<I extends WritableComparable,
cleanedUpPath = basePath + CLEANED_UP_DIR;
String restartJobId = RESTART_JOB_ID.get(conf);
+
savedCheckpointBasePath =
- CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
- CHECKPOINT_DIRECTORY.getDefaultValue() + "/" +
- (restartJobId == null ? getJobId() : restartJobId));
- checkpointBasePath =
- CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
- CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
+ CheckpointingUtils.getCheckpointBasePath(getConfiguration(),
+ restartJobId == null ? getJobId() : restartJobId);
+
+ checkpointBasePath = CheckpointingUtils.
+ getCheckpointBasePath(getConfiguration(), getJobId());
+
masterElectionPath = basePath + MASTER_ELECTION_DIR;
myProgressPath = basePath + WORKER_PROGRESSES + "/" + taskPartition;
String serverPortList = conf.getZookeeperList();
@@ -392,6 +387,24 @@ public abstract class BspService<I extends WritableComparable,
} catch (IOException e) {
throw new RuntimeException(e);
}
+
+ //Trying to restart from the latest superstep
+ if (restartJobId != null &&
+ restartedSuperstep == UNSET_SUPERSTEP) {
+ try {
+ restartedSuperstep = getLastCheckpointedSuperstep();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ this.cachedSuperstep = restartedSuperstep;
+ if ((restartedSuperstep != UNSET_SUPERSTEP) &&
+ (restartedSuperstep < 0)) {
+ throw new IllegalArgumentException(
+ "BspService: Invalid superstep to restart - " +
+ restartedSuperstep);
+ }
+
}
/**
@@ -643,28 +656,6 @@ public abstract class BspService<I extends WritableComparable,
}
/**
- * Should checkpoint on this superstep? If checkpointing, always
- * checkpoint the first user superstep. If restarting, the first
- * checkpoint is after the frequency has been met.
- *
- * @param superstep Decide if checkpointing no this superstep
- * @return True if this superstep should be checkpointed, false otherwise
- */
- public final boolean checkpointFrequencyMet(long superstep) {
- if (checkpointFrequency == 0) {
- return false;
- }
- long firstCheckpoint = INPUT_SUPERSTEP + 1;
- if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
- firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
- }
- if (superstep < firstCheckpoint) {
- return false;
- }
- return ((superstep - firstCheckpoint) % checkpointFrequency) == 0;
- }
-
- /**
* Get the file system
*
* @return file system
@@ -1241,4 +1232,41 @@ public abstract class BspService<I extends WritableComparable,
}
return eventProcessed;
}
+
+ /**
+ * Get the last saved superstep.
+ *
+ * @return Last good superstep number
+ * @throws IOException
+ */
+ protected long getLastCheckpointedSuperstep() throws IOException {
+ FileStatus[] fileStatusArray =
+ getFs().listStatus(new Path(savedCheckpointBasePath),
+ new FinalizedCheckpointPathFilter());
+ if (fileStatusArray == null) {
+ return -1;
+ }
+ Arrays.sort(fileStatusArray);
+ long lastCheckpointedSuperstep = getCheckpoint(
+ fileStatusArray[fileStatusArray.length - 1].getPath());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
+ lastCheckpointedSuperstep + " from " +
+ fileStatusArray[fileStatusArray.length - 1].
+ getPath().toString());
+ }
+ return lastCheckpointedSuperstep;
+ }
+
+ /**
+ * Only get the finalized checkpoint files
+ */
+ private static class FinalizedCheckpointPathFilter implements PathFilter {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX);
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
index ff3e427..560f1fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -51,15 +51,6 @@ public interface CentralizedService<I extends WritableComparable,
long getRestartedSuperstep();
/**
- * Given a superstep, should it be checkpointed based on the
- * checkpoint frequency?
- *
- * @param superstep superstep to check against frequency
- * @return true if checkpoint frequency met or superstep is 1.
- */
- boolean checkpointFrequencyMet(long superstep);
-
- /**
* Get list of workers
*
* @return List of workers
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index e5b7cf3..9b4f9d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -175,7 +175,9 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
*
* @throws IOException
* @throws InterruptedException
+ * @param superstepState what was the state
+ * of the last complete superstep?
*/
- void cleanup()
+ void cleanup(SuperstepState superstepState)
throws IOException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index e5d0ae1..37aed45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -21,6 +21,7 @@ package org.apache.giraph.bsp;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
@@ -237,4 +238,11 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
*/
void cleanup(FinishedSuperstepStats finishedSuperstepStats)
throws IOException, InterruptedException;
+
+ /**
+ * Loads Global stats from zookeeper.
+ * @return global stats stored in zookeeper for
+ * previous superstep.
+ */
+ GlobalStats getGlobalStats();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java
new file mode 100644
index 0000000..74db490
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+/**
+ * Enum represents possible checkpoint state.
+ */
+public enum CheckpointStatus {
+ /** Do nothing, no checkpoint required */
+ NONE,
+ /** Regular checkpoint */
+ CHECKPOINT,
+ /** Do checkpoint and then halt further computation */
+ CHECKPOINT_AND_HALT
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
index c384fbf..768278b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
@@ -23,11 +23,33 @@ package org.apache.giraph.bsp;
*/
public enum SuperstepState {
/** Nothing happened yet */
- INITIAL,
+ INITIAL(false),
/** A worker died during this superstep */
- WORKER_FAILURE,
+ WORKER_FAILURE(false),
/** This superstep completed correctly */
- THIS_SUPERSTEP_DONE,
+ THIS_SUPERSTEP_DONE(false),
/** All supersteps are complete */
- ALL_SUPERSTEPS_DONE,
+ ALL_SUPERSTEPS_DONE(true),
+ /** Execution halted */
+ CHECKPOINT_AND_HALT(true);
+
+ /** Should we stop execution after this superstep? */
+ private boolean executionComplete;
+
+ /**
+ * Enum constructor
+ * @param executionComplete is final state?
+ */
+ SuperstepState(boolean executionComplete) {
+ this.executionComplete = executionComplete;
+ }
+
+ /**
+ * Returns true if execution has to be stopped after this
+ * superstep.
+ * @return whether execution is complete.
+ */
+ public boolean isExecutionComplete() {
+ return executionComplete;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index a92cd1c..1fd85e4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -269,4 +269,14 @@ public class ServerData<I extends WritableComparable,
public void addIncomingWorkerToWorkerMessage(Writable message) {
incomingWorkerToWorkerMessages.add(message);
}
+
+
+ /**
+ * Get worker to worker messages received in previous superstep.
+ * @return list of current worker to worker messages.
+ */
+ public List<Writable> getCurrentWorkerToWorkerMessages() {
+ return currentWorkerToWorkerMessages;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 0424a47..da0a8db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -1137,12 +1137,16 @@ public interface GiraphConstants {
new IntConfOption("giraph.checkpoint.io.threads", 8,
"Number of threads for writing and reading checkpoints");
- /** Compression algorithm to be used for checkpointing */
+ /**
+ * Compression algorithm to be used for checkpointing.
+ * Defined by extension for hadoop compatibility reasons.
+ */
StrConfOption CHECKPOINT_COMPRESSION_CODEC =
new StrConfOption("giraph.checkpoint.compression.codec",
- "org.apache.hadoop.io.compress.DefaultCodec",
+ ".deflate",
"Defines compression algorithm we will be using for " +
- "storing checkpoint");
+ "storing checkpoint. Available options include but " +
+ "not restricted to: .deflate, .gz, .bz2, .lzo");
/** Number of threads to use in async message store, 0 means
* we should not use async message processing */
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
index c351778..f7895a9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
@@ -17,6 +17,8 @@
*/
package org.apache.giraph.graph;
+import org.apache.giraph.bsp.CheckpointStatus;
+
/**
* Immutable graph stats after the completion of a superstep
*/
@@ -27,6 +29,11 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
private final boolean allVerticesHalted;
/** Needs to load a checkpoint */
private final boolean mustLoadCheckpoint;
+ /**
+ * Master decides when we need to checkpoint and what should
+ * we do next.
+ */
+ private final CheckpointStatus checkpointStatus;
/**
* Constructor.
@@ -36,16 +43,19 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
* @param numVertices Number of vertices
* @param numEdges Number of edges
* @param mustLoadCheckpoint Has to load a checkpoint?
+ * @param checkpointStatus Should we checkpoint after this superstep?
*/
public FinishedSuperstepStats(long numLocalVertices,
boolean allVerticesHalted,
long numVertices,
long numEdges,
- boolean mustLoadCheckpoint) {
+ boolean mustLoadCheckpoint,
+ CheckpointStatus checkpointStatus) {
super(numVertices, numEdges);
this.localVertexCount = numLocalVertices;
this.allVerticesHalted = allVerticesHalted;
this.mustLoadCheckpoint = mustLoadCheckpoint;
+ this.checkpointStatus = checkpointStatus;
}
public long getLocalVertexCount() {
@@ -69,4 +79,12 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
public boolean mustLoadCheckpoint() {
return mustLoadCheckpoint;
}
+
+ /**
+ * What master thinks about checkpointing after this superstep.
+ * @return CheckpointStatus that reflects master decision.
+ */
+ public CheckpointStatus getCheckpointStatus() {
+ return checkpointStatus;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
index bc56c9c..e11f02c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.partition.PartitionStats;
import org.apache.hadoop.io.Writable;
@@ -41,6 +42,12 @@ public class GlobalStats implements Writable {
private long messageBytesCount = 0;
/** Whether the computation should be halted */
private boolean haltComputation = false;
+ /**
+ * Master's decision on whether we should checkpoint and
+ * what to do next.
+ */
+ private CheckpointStatus checkpointStatus =
+ CheckpointStatus.NONE;
/**
* Add the stats of a partition to the global stats.
@@ -81,6 +88,14 @@ public class GlobalStats implements Writable {
haltComputation = value;
}
+ public CheckpointStatus getCheckpointStatus() {
+ return checkpointStatus;
+ }
+
+ public void setCheckpointStatus(CheckpointStatus checkpointStatus) {
+ this.checkpointStatus = checkpointStatus;
+ }
+
/**
* Add messages to the global stats.
*
@@ -107,6 +122,11 @@ public class GlobalStats implements Writable {
messageCount = input.readLong();
messageBytesCount = input.readLong();
haltComputation = input.readBoolean();
+ if (input.readBoolean()) {
+ checkpointStatus = CheckpointStatus.values()[input.readInt()];
+ } else {
+ checkpointStatus = null;
+ }
}
@Override
@@ -117,6 +137,10 @@ public class GlobalStats implements Writable {
output.writeLong(messageCount);
output.writeLong(messageBytesCount);
output.writeBoolean(haltComputation);
+ output.writeBoolean(checkpointStatus != null);
+ if (checkpointStatus != null) {
+ output.writeInt(checkpointStatus.ordinal());
+ }
}
@Override
@@ -124,6 +148,7 @@ public class GlobalStats implements Writable {
return "(vtx=" + vertexCount + ",finVtx=" +
finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
messageCount + ",msgBytesCount=" +
- messageBytesCount + ",haltComputation=" + haltComputation + ")";
+ messageBytesCount + ",haltComputation=" + haltComputation +
+ ", checkpointStatus=" + checkpointStatus + ')';
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 6ebb002..8a97939 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -21,6 +21,7 @@ package org.apache.giraph.graph;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -120,7 +121,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
/** Superstep stats */
private FinishedSuperstepStats finishedSuperstepStats =
- new FinishedSuperstepStats(0, false, 0, 0, false);
+ new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
// Per-Job Metrics
/** Timer for WorkerContext#preApplication() */
@@ -281,7 +282,18 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
context.progress();
serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
context.progress();
- graphState = checkSuperstepRestarted(superstep, graphState);
+ boolean hasBeenRestarted = checkSuperstepRestarted(superstep);
+
+ GlobalStats globalStats = serviceWorker.getGlobalStats();
+
+ if (hasBeenRestarted) {
+ graphState = new GraphState(superstep,
+ finishedSuperstepStats.getVertexCount(),
+ finishedSuperstepStats.getEdgeCount(),
+ context);
+ } else if (storeCheckpoint(globalStats.getCheckpointStatus())) {
+ break;
+ }
prepareForSuperstep(graphState);
context.progress();
MessageStore<I, Writable> messageStore =
@@ -735,11 +747,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/**
* Handle the event that this superstep is a restart of a failed one.
* @param superstep current superstep
- * @param graphState the BSP graph state
* @return the graph state, updated if this is a restart superstep
*/
- private GraphState checkSuperstepRestarted(long superstep,
- GraphState graphState) throws IOException {
+ private boolean checkSuperstepRestarted(long superstep) throws IOException {
// Might need to restart from another superstep
// (manually or automatic), or store a checkpoint
if (serviceWorker.getRestartedSuperstep() == superstep) {
@@ -750,15 +760,25 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
serviceWorker.getRestartedSuperstep());
finishedSuperstepStats = new FinishedSuperstepStats(0, false,
vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
- false);
- graphState = new GraphState(superstep,
- finishedSuperstepStats.getVertexCount(),
- finishedSuperstepStats.getEdgeCount(),
- context);
- } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
+ false, CheckpointStatus.NONE);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Check if it's time to checkpoint and actually does checkpointing
+ * if it is.
+ * @param checkpointStatus master's decision
+ * @return true if we need to stop computation after checkpoint
+ * @throws IOException
+ */
+ private boolean storeCheckpoint(CheckpointStatus checkpointStatus)
+ throws IOException {
+ if (checkpointStatus != CheckpointStatus.NONE) {
serviceWorker.storeCheckpoint();
}
- return graphState;
+ return checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT;
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
index 0cab86c..edf6bce 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java
@@ -30,4 +30,9 @@ public class DefaultGiraphJobRetryChecker implements GiraphJobRetryChecker {
// By default, don't retry failed jobs
return false;
}
+
+ @Override
+ public boolean shouldRestartCheckpoint() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 4a1f02e..436126b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -24,9 +24,13 @@ import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphMapper;
+import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.log4j.Logger;
import java.io.IOException;
@@ -261,6 +265,25 @@ public class GiraphJob {
jobProgressTracker.stop();
}
jobObserver.jobFinished(submittedJob, passed);
+
+ FileSystem fs = FileSystem.get(conf);
+ JobID jobID = HadoopUtils.getJobID(submittedJob);
+ if (jobID != null) {
+ Path checkpointMark =
+ CheckpointingUtils.getCheckpointMarkPath(conf, jobID.toString());
+
+ if (fs.exists(checkpointMark)) {
+ if (retryChecker.shouldRestartCheckpoint()) {
+ GiraphConstants.RESTART_JOB_ID.set(conf, jobID.toString());
+ continue;
+ }
+ }
+ } else {
+ LOG.warn("jobID is null, are you using hadoop 0.20.203? " +
+ "Please report this issue here " +
+ "https://issues.apache.org/jira/browse/GIRAPH-933");
+ }
+
if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
return passed;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
index 53a800e..556b128 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java
@@ -33,4 +33,10 @@ public interface GiraphJobRetryChecker {
* @return True iff job should be retried
*/
boolean shouldRetry(Job submittedJob, int tryCount);
+
+ /**
+ * The job has been checkpointed and halted. Should we now restart it?
+ * @return true if checkpointed job should be automatically restarted.
+ */
+ boolean shouldRestartCheckpoint();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java
index 9530fd6..f2c673b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java
@@ -18,6 +18,7 @@
package org.apache.giraph.job;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -102,6 +103,20 @@ public class HadoopUtils {
}
/**
+ * Get Job ID from job.
+ * May return null for hadoop 0.20.203
+ * @param job submitted job
+ * @return JobId for submitted job.
+ */
+ public static JobID getJobID(Job job) {
+ /*if[HADOOP_JOB_ID_AVAILABLE]
+ return job.getID();
+ else[HADOOP_JOB_ID_AVAILABLE]*/
+ return job.getJobID();
+ /*end[HADOOP_JOB_ID_AVAILABLE]*/
+ }
+
+ /**
* Create a JobContext, supporting many Hadoops.
*
* @param conf Configuration
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index e129390..671df23 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FilenameUtils;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.bsp.SuperstepState;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.MasterServer;
@@ -56,6 +57,7 @@ import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.giraph.utils.JMapHistoDumper;
import org.apache.giraph.utils.ReactiveJMapHistoDumper;
import org.apache.giraph.utils.ProgressableUtils;
@@ -67,10 +69,8 @@ import org.apache.giraph.worker.WorkerInfo;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -99,7 +99,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -189,6 +188,11 @@ public class BspServiceMaster<I extends WritableComparable,
/** MasterCompute time */
private GiraphTimer masterComputeTimer;
+ /** Checkpoint frequency */
+ private final int checkpointFrequency;
+ /** Current checkpoint status */
+ private CheckpointStatus checkpointStatus;
+
/**
* Constructor for setting up the master.
*
@@ -224,6 +228,9 @@ public class BspServiceMaster<I extends WritableComparable,
}
observers = conf.createMasterObservers();
+ this.checkpointFrequency = conf.getCheckpointFrequency();
+ this.checkpointStatus = CheckpointStatus.NONE;
+
GiraphMetrics.get().addSuperstepResetObserver(this);
GiraphStats.init((Mapper.Context) context);
}
@@ -365,7 +372,11 @@ public class BspServiceMaster<I extends WritableComparable,
@SuppressWarnings("deprecation")
JobID jobId = JobID.forName(getJobId());
RunningJob job = jobClient.getJob(jobId);
- job.killJob();
+ if (job != null) {
+ job.killJob();
+ } else {
+ LOG.error("Jon not found for jobId=" + getJobId());
+ }
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
@@ -1196,11 +1207,11 @@ public class BspServiceMaster<I extends WritableComparable,
*
* @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper
* @param chosenWorkerInfoList List of the healthy workers
- * @return true if they are all alive, false otherwise.
+ * @return a list of dead workers. Empty list if all workers are alive.
* @throws InterruptedException
* @throws KeeperException
*/
- private boolean superstepChosenWorkerAlive(
+ private Collection<WorkerInfo> superstepChosenWorkerAlive(
String chosenWorkerInfoHealthPath,
List<WorkerInfo> chosenWorkerInfoList)
throws KeeperException, InterruptedException {
@@ -1208,16 +1219,13 @@ public class BspServiceMaster<I extends WritableComparable,
getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
Set<WorkerInfo> chosenWorkerInfoHealthySet =
new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
- boolean allChosenWorkersHealthy = true;
+ List<WorkerInfo> deadWorkers = new ArrayList<>();
for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
- allChosenWorkersHealthy = false;
- LOG.error("superstepChosenWorkerAlive: Missing chosen " +
- "worker " + chosenWorkerInfo +
- " on superstep " + getSuperstep());
+ deadWorkers.add(chosenWorkerInfo);
}
}
- return allChosenWorkersHealthy;
+ return deadWorkers;
}
@Override
@@ -1257,37 +1265,13 @@ public class BspServiceMaster<I extends WritableComparable,
}
}
- /**
- * Only get the finalized checkpoint files
- */
- public static class FinalizedCheckpointPathFilter implements PathFilter {
- @Override
- public boolean accept(Path path) {
- return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX);
- }
- }
-
@Override
public long getLastGoodCheckpoint() throws IOException {
// Find the last good checkpoint if none have been written to the
// knowledge of this master
if (lastCheckpointedSuperstep == -1) {
try {
- FileStatus[] fileStatusArray =
- getFs().listStatus(new Path(savedCheckpointBasePath),
- new FinalizedCheckpointPathFilter());
- if (fileStatusArray == null) {
- return -1;
- }
- Arrays.sort(fileStatusArray);
- lastCheckpointedSuperstep = getCheckpoint(
- fileStatusArray[fileStatusArray.length - 1].getPath());
- if (LOG.isInfoEnabled()) {
- LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
- lastCheckpointedSuperstep + " from " +
- fileStatusArray[fileStatusArray.length - 1].
- getPath().toString());
- }
+ lastCheckpointedSuperstep = getLastCheckpointedSuperstep();
} catch (IOException e) {
LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
"found, killing the job.", e);
@@ -1306,12 +1290,15 @@ public class BspServiceMaster<I extends WritableComparable,
* hostname and id
* @param workerInfoList List of the workers to wait for
* @param event Event to wait on for a chance to be done.
+ * @param ignoreDeath In case if worker died after making it through
+ * barrier, we will ignore death if set to true.
* @return True if barrier was successful, false if there was a worker
* failure
*/
private boolean barrierOnWorkerList(String finishedWorkerPath,
List<WorkerInfo> workerInfoList,
- BspEvent event) {
+ BspEvent event,
+ boolean ignoreDeath) {
try {
getZkExt().createOnceExt(finishedWorkerPath,
null,
@@ -1339,6 +1326,7 @@ public class BspServiceMaster<I extends WritableComparable,
final int defaultTaskTimeoutMsec = 10 * 60 * 1000; // from TaskTracker
final int taskTimeoutMsec = getContext().getConfiguration().getInt(
"mapred.task.timeout", defaultTaskTimeoutMsec);
+ List<WorkerInfo> deadWorkers = new ArrayList<>();
while (true) {
try {
finishedHostnameIdList =
@@ -1389,6 +1377,15 @@ public class BspServiceMaster<I extends WritableComparable,
break;
}
+ for (WorkerInfo deadWorker : deadWorkers) {
+ if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
+ LOG.error("barrierOnWorkerList: no results arived from " +
+ "worker that was pronounced dead: " + deadWorker +
+ " on superstep " + getSuperstep());
+ return false;
+ }
+ }
+
// Wait for a signal or timeout
event.waitMsecs(taskTimeoutMsec / 2);
event.reset();
@@ -1396,9 +1393,13 @@ public class BspServiceMaster<I extends WritableComparable,
// Did a worker die?
try {
- if (!superstepChosenWorkerAlive(
+ deadWorkers.addAll(superstepChosenWorkerAlive(
workerInfoHealthyPath,
- workerInfoList)) {
+ workerInfoList));
+ if (!ignoreDeath && deadWorkers.size() > 0) {
+ LOG.error("barrierOnWorkerList: Missing chosen " +
+ "workers " + deadWorkers +
+ " on superstep " + getSuperstep());
return false;
}
} catch (KeeperException e) {
@@ -1462,7 +1463,8 @@ public class BspServiceMaster<I extends WritableComparable,
String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
chosenWorkerInfoList,
- inputSplitEvents.getDoneStateChanged())) {
+ inputSplitEvents.getDoneStateChanged(),
+ false)) {
throw new IllegalStateException(logPrefix + ": Worker failed during " +
"input split (currently not supported)");
}
@@ -1589,14 +1591,15 @@ public class BspServiceMaster<I extends WritableComparable,
// Finalize the valid checkpoint file prefixes and possibly
// the aggregators.
- if (checkpointFrequencyMet(getSuperstep())) {
+ if (checkpointStatus != CheckpointStatus.NONE) {
String workerWroteCheckpointPath =
getWorkerWroteCheckpointPath(getApplicationAttempt(),
getSuperstep());
// first wait for all the workers to write their checkpoint data
if (!barrierOnWorkerList(workerWroteCheckpointPath,
chosenWorkerInfoList,
- getWorkerWroteCheckpointEvent())) {
+ getWorkerWroteCheckpointEvent(),
+ checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) {
return SuperstepState.WORKER_FAILURE;
}
try {
@@ -1606,6 +1609,9 @@ public class BspServiceMaster<I extends WritableComparable,
"coordinateSuperstep: IOException on finalizing checkpoint",
e);
}
+ if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) {
+ return SuperstepState.CHECKPOINT_AND_HALT;
+ }
}
if (getSuperstep() == INPUT_SUPERSTEP) {
@@ -1630,7 +1636,8 @@ public class BspServiceMaster<I extends WritableComparable,
getWorkerFinishedPath(getApplicationAttempt(), getSuperstep());
if (!barrierOnWorkerList(finishedWorkerPath,
chosenWorkerInfoList,
- getSuperstepStateChangedEvent())) {
+ getSuperstepStateChangedEvent(),
+ false)) {
return SuperstepState.WORKER_FAILURE;
}
@@ -1677,10 +1684,14 @@ public class BspServiceMaster<I extends WritableComparable,
}
getConfiguration().updateSuperstepClasses(superstepClasses);
+ //Signal workers that we want to checkpoint
+ checkpointStatus = getCheckpointStatus(getSuperstep() + 1);
+ globalStats.setCheckpointStatus(checkpointStatus);
// Let everyone know the aggregated application state through the
// superstep finishing znode.
String superstepFinishedNode =
getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
+
WritableUtils.writeToZnode(
getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses);
updateCounters(globalStats);
@@ -1703,6 +1714,43 @@ public class BspServiceMaster<I extends WritableComparable,
}
/**
+ * Should checkpoint on this superstep? If checkpointing, always
+ * checkpoint the first user superstep. If restarting, the first
+ * checkpoint is after the frequency has been met.
+ *
+ * @param superstep Decide if checkpointing no this superstep
+ * @return True if this superstep should be checkpointed, false otherwise
+ */
+ private CheckpointStatus getCheckpointStatus(long superstep) {
+ try {
+ if (getZkExt().
+ exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) {
+ return CheckpointStatus.CHECKPOINT_AND_HALT;
+ }
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "cleanupZooKeeper: Got KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "cleanupZooKeeper: Got IllegalStateException", e);
+ }
+ if (checkpointFrequency == 0) {
+ return CheckpointStatus.NONE;
+ }
+ long firstCheckpoint = INPUT_SUPERSTEP + 1;
+ if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+ firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
+ }
+ if (superstep < firstCheckpoint) {
+ return CheckpointStatus.NONE;
+ }
+ if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
+ return CheckpointStatus.CHECKPOINT;
+ }
+ return CheckpointStatus.NONE;
+ }
+
+ /**
* This doMasterCompute is only called
* after masterCompute is initialized
*/
@@ -1837,7 +1885,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
@Override
- public void cleanup() throws IOException {
+ public void cleanup(SuperstepState superstepState) throws IOException {
ImmutableClassesGiraphConfiguration conf = getConfiguration();
// All master processes should denote they are done by adding special
@@ -1872,7 +1920,8 @@ public class BspServiceMaster<I extends WritableComparable,
getGraphTaskManager().setIsMaster(true);
cleanUpZooKeeper();
// If desired, cleanup the checkpoint directory
- if (GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
+ if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE &&
+ GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) {
boolean success =
getFs().delete(new Path(checkpointBasePath), true);
if (LOG.isInfoEnabled()) {
@@ -1882,6 +1931,12 @@ public class BspServiceMaster<I extends WritableComparable,
" succeeded ");
}
}
+ if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) {
+ getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf,
+ getJobId()), true);
+ failJob(new Exception("Checkpoint and halt requested. " +
+ "Killing this job."));
+ }
aggregatorHandler.close();
masterClient.closeConnections();
masterServer.close();
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index 0635210..8e4e0b8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -96,6 +96,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
long initializeMillis = 0;
long endMillis = 0;
bspServiceMaster.setup();
+ SuperstepState superstepState = SuperstepState.INITIAL;
+
if (bspServiceMaster.becomeMaster()) {
// First call to checkWorkers waits for all pending resources.
// If these resources are still available at subsequent calls it just
@@ -113,11 +115,9 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
long setupMillis = System.currentTimeMillis() - initializeMillis;
GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
setupSecs = setupMillis / 1000.0d;
- SuperstepState superstepState = SuperstepState.INITIAL;
- long cachedSuperstep = BspService.UNSET_SUPERSTEP;
- while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
+ while (!superstepState.isExecutionComplete()) {
long startSuperstepMillis = System.currentTimeMillis();
- cachedSuperstep = bspServiceMaster.getSuperstep();
+ long cachedSuperstep = bspServiceMaster.getSuperstep();
GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
Class<? extends Computation> computationClass =
bspServiceMaster.getMasterCompute().getComputation();
@@ -153,7 +153,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
}
}
- bspServiceMaster.cleanup();
+ bspServiceMaster.cleanup(superstepState);
if (!superstepSecsMap.isEmpty()) {
GiraphTimers.getInstance().getShutdownMs().
increment(System.currentTimeMillis() - endMillis);
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
new file mode 100644
index 0000000..11d5e4f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
+
+/**
+ * Holds useful functions to get checkpoint paths
+ * in hdfs.
+ */
+public class CheckpointingUtils {
+
+ /**
+ * Do not call constructor.
+ */
+ private CheckpointingUtils() {
+ }
+
+ /**
+ * Path to the checkpoint's root (including job id)
+ * @param conf Immutable configuration of the job
+ * @param jobId job ID
+ * @return checkpoint's root
+ */
+ public static String getCheckpointBasePath(Configuration conf,
+ String jobId) {
+ return CHECKPOINT_DIRECTORY.getWithDefault(conf,
+ CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + jobId);
+ }
+
+ /**
+ * Path to checkpoint&halt node in hdfs.
+ * It is set to let client know that master has
+ * successfully finished checkpointing and job can be restarted.
+ * @param conf Immutable configuration of the job
+ * @param jobId job ID
+ * @return path to checkpoint&halt node in hdfs.
+ */
+ public static Path getCheckpointMarkPath(Configuration conf,
+ String jobId) {
+ return new Path(getCheckpointBasePath(conf, jobId), "halt");
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 763f59d..3c5cbad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -676,4 +676,67 @@ public class WritableUtils {
return null;
}
}
+
+ /**
+ * Writes a list of Writable objects into output stream.
+ * This method is trying to optimize space occupied by class information only
+ * storing class object if it is different from the previous one
+ * as in most cases arrays tend to have objects of the same type inside.
+ * @param list serialized object
+ * @param output the output stream
+ * @throws IOException
+ */
+ public static void writeList(List<Writable> list, DataOutput output)
+ throws IOException {
+ output.writeInt(list.size());
+ Class<? extends Writable> clazz = null;
+ for (Writable element : list) {
+ output.writeBoolean(element == null);
+ if (element != null) {
+ if (element.getClass() != clazz) {
+ clazz = element.getClass();
+ output.writeBoolean(true);
+ writeClass(clazz, output);
+ } else {
+ output.writeBoolean(false);
+ }
+ element.write(output);
+ }
+ }
+ }
+
+ /**
+ * Reads list of Writable objects from data input stream.
+ * Input stream should have class information along with object data.
+ * @param input input stream
+ * @return deserialized list
+ * @throws IOException
+ */
+ public static List<Writable> readList(DataInput input) throws IOException {
+ try {
+
+ int size = input.readInt();
+ List<Writable> res = new ArrayList<>(size);
+ Class<? extends Writable> clazz = null;
+ for (int i = 0; i < size; i++) {
+ boolean isNull = input.readBoolean();
+ if (isNull) {
+ res.add(null);
+ } else {
+ boolean hasClassInfo = input.readBoolean();
+ if (hasClassInfo) {
+ clazz = readClass(input);
+ }
+ Writable element = clazz.newInstance();
+ element.readFields(input);
+ res.add(element);
+ }
+ }
+ return res;
+
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException("unable to instantiate object", e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index d2d24ee..447bb6f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -21,6 +21,7 @@ package org.apache.giraph.worker;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
@@ -540,7 +541,8 @@ public class BspServiceWorker<I extends WritableComparable,
// 6. Wait for superstep INPUT_SUPERSTEP to complete.
if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
setCachedSuperstep(getRestartedSuperstep());
- return new FinishedSuperstepStats(0, false, 0, 0, true);
+ return new FinishedSuperstepStats(0, false, 0, 0, true,
+ CheckpointStatus.NONE);
}
JSONObject jobState = getJobState();
@@ -557,7 +559,8 @@ public class BspServiceWorker<I extends WritableComparable,
getApplicationAttempt());
}
setRestartedSuperstep(getSuperstep());
- return new FinishedSuperstepStats(0, false, 0, 0, true);
+ return new FinishedSuperstepStats(0, false, 0, 0, true,
+ CheckpointStatus.NONE);
}
} catch (JSONException e) {
throw new RuntimeException(
@@ -946,7 +949,8 @@ public class BspServiceWorker<I extends WritableComparable,
globalStats.getHaltComputation(),
globalStats.getVertexCount(),
globalStats.getEdgeCount(),
- false);
+ false,
+ globalStats.getCheckpointStatus());
}
/**
@@ -1314,8 +1318,11 @@ public class BspServiceWorker<I extends WritableComparable,
throws IOException, InterruptedException {
workerClient.closeConnections();
setCachedSuperstep(getSuperstep() - 1);
- saveVertices(finishedSuperstepStats.getLocalVertexCount());
- saveEdges();
+ if (finishedSuperstepStats.getCheckpointStatus() !=
+ CheckpointStatus.CHECKPOINT_AND_HALT) {
+ saveVertices(finishedSuperstepStats.getLocalVertexCount());
+ saveEdges();
+ }
WorkerProgress.get().finishStoring();
if (workerProgressWriter != null) {
workerProgressWriter.stop();
@@ -1414,6 +1421,10 @@ public class BspServiceWorker<I extends WritableComparable,
}
+ List<Writable> w2wMessages =
+ getServerData().getCurrentWorkerToWorkerMessages();
+ WritableUtils.writeList(w2wMessages, checkpointOutputStream);
+
checkpointOutputStream.close();
getFs().createNewFile(validFilePath);
@@ -1488,9 +1499,9 @@ public class BspServiceWorker<I extends WritableComparable,
final CompressionCodec codec =
new CompressionCodecFactory(getConfiguration())
- .getCodecByClassName(
+ .getCodec(new Path(
GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
- .get(getConfiguration()));
+ .get(getConfiguration())));
long t0 = System.currentTimeMillis();
@@ -1559,9 +1570,9 @@ public class BspServiceWorker<I extends WritableComparable,
final CompressionCodec codec =
new CompressionCodecFactory(getConfiguration())
- .getCodecByClassName(
+ .getCodec(new Path(
GiraphConstants.CHECKPOINT_COMPRESSION_CODEC
- .get(getConfiguration()));
+ .get(getConfiguration())));
long t0 = System.currentTimeMillis();
@@ -1660,6 +1671,10 @@ public class BspServiceWorker<I extends WritableComparable,
getServerData().getCurrentMessageStore().readFieldsForPartition(
checkpointStream, partitionId);
}
+
+ List<Writable> w2wMessages = WritableUtils.readList(checkpointStream);
+ getServerData().getCurrentWorkerToWorkerMessages().addAll(w2wMessages);
+
checkpointStream.close();
if (LOG.isInfoEnabled()) {
@@ -1920,4 +1935,18 @@ else[HADOOP_NON_SECURE]*/
public SuperstepOutput<I, V, E> getSuperstepOutput() {
return superstepOutput;
}
+
+ @Override
+ public GlobalStats getGlobalStats() {
+ GlobalStats globalStats = new GlobalStats();
+ if (getSuperstep() > Math.max(INPUT_SUPERSTEP, getRestartedSuperstep())) {
+ String superstepFinishedNode =
+ getSuperstepFinishedPath(getApplicationAttempt(),
+ getSuperstep() - 1);
+ WritableUtils.readFieldsFromZnode(
+ getZkExt(), superstepFinishedNode, false, null,
+ globalStats);
+ }
+ return globalStats;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
new file mode 100644
index 0000000..c712b5a
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test case for WritableUtils.
+ */
+public class TestWritableUtils {
+
+ /**
+ * Tests readList and writeList functions in writable utils.
+ * @throws IOException
+ */
+ @Test
+ public void testListSerialization() throws IOException {
+ List<Writable> list = new ArrayList<>();
+ list.add(new LongWritable(1));
+ list.add(new LongWritable(2));
+ list.add(null);
+ list.add(new FloatWritable(3));
+ list.add(new FloatWritable(4));
+ list.add(new LongWritable(5));
+ list.add(new LongWritable(6));
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ WritableUtils.writeList(list, dos);
+ dos.close();
+
+ byte[] data = bos.toByteArray();
+
+ DataInputStream input =
+ new DataInputStream(new ByteArrayInputStream(data));
+
+ List<Writable> result = WritableUtils.readList(input);
+
+ Assert.assertEquals(list, result);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
index 2939af7..9502557 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
@@ -19,8 +19,10 @@
package org.apache.giraph;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.bsp.BspService;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.examples.SimpleSuperstepComputation;
@@ -29,17 +31,25 @@ import org.apache.giraph.graph.Vertex;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.junit.Assert;
import org.junit.Test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -56,13 +66,8 @@ public class TestCheckpointing extends BspCase {
Logger.getLogger(TestCheckpointing.class);
/** ID to be used with test job */
public static final String TEST_JOB_ID = "test_job";
- /**
- * Compute will double check that we don't run supersteps
- * lesser than specified by this key. That way we ensure that
- * computation actually restarted and not recalculated from the
- * beginning.
- */
- public static final String KEY_MIN_SUPERSTEP = "minimum.superstep";
+
+ private static SuperstepCallback SUPERSTEP_CALLBACK;
/**
* Create the test case
@@ -84,49 +89,45 @@ public class TestCheckpointing extends BspCase {
public void testBspCheckpoint(boolean useAsyncMessageStore)
throws IOException, InterruptedException, ClassNotFoundException {
Path checkpointsDir = getTempPath("checkpointing");
- Path outputPath = getTempPath(getCallingMethodName());
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setComputationClass(
- CheckpointComputation.class);
- conf.setWorkerContextClass(
- CheckpointVertexWorkerContext.class);
- conf.setMasterComputeClass(
- CheckpointVertexMasterCompute.class);
- conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class);
- conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
- conf.set("mapred.job.id", TEST_JOB_ID);
- conf.set(KEY_MIN_SUPERSTEP, "0");
if (useAsyncMessageStore) {
GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.set(conf, 2);
}
- GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
- GiraphConfiguration configuration = job.getConfiguration();
- GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false);
- configuration.setCheckpointFrequency(2);
+ SUPERSTEP_CALLBACK = null;
- assertTrue(job.run(true));
+ GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false);
+ conf.setCheckpointFrequency(2);
- long idSum = 0;
- if (!runningInDistributedMode()) {
- FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
- outputPath);
- idSum = CheckpointVertexWorkerContext
- .getFinalSum();
- LOG.info("testBspCheckpoint: idSum = " + idSum +
- " fileLen = " + fileStatus.getLen());
- }
+ long idSum = runOriginalJob(checkpointsDir, conf);
+ assertEquals(10, idSum);
+
+ SUPERSTEP_CALLBACK = new SuperstepCallback() {
+ @Override
+ public void superstep(long superstep,
+ ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
+ if (superstep < 2) {
+ Assert.fail("Restarted JOB should not be executed on superstep " + superstep);
+ }
+ }
+ };
+
+ runRestartedJob(checkpointsDir, conf, idSum, 2);
+
+
+ }
- // Restart the test from superstep 2
- LOG.info("testBspCheckpoint: Restarting from superstep 2" +
- " with checkpoint path = " + checkpointsDir);
+ private void runRestartedJob(Path checkpointsDir, GiraphConfiguration conf, long idSum, long restartFrom) throws IOException, InterruptedException, ClassNotFoundException {
+ Path outputPath;
+ LOG.info("testBspCheckpoint: Restarting from the latest superstep " +
+ "with checkpoint path = " + checkpointsDir);
outputPath = getTempPath("checkpointing_restarted");
GiraphConstants.RESTART_JOB_ID.set(conf, TEST_JOB_ID);
conf.set("mapred.job.id", "restarted_test_job");
- conf.set(GiraphConstants.RESTART_SUPERSTEP, "2");
- conf.set(KEY_MIN_SUPERSTEP, "2");
+ if (restartFrom >= 0) {
+ conf.set(GiraphConstants.RESTART_SUPERSTEP, Long.toString(restartFrom));
+ }
GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
conf, outputPath);
@@ -135,6 +136,8 @@ public class TestCheckpointing extends BspCase {
checkpointsDir.toString());
assertTrue(restartedJob.run(true));
+
+
if (!runningInDistributedMode()) {
long idSumRestarted =
CheckpointVertexWorkerContext
@@ -145,6 +148,36 @@ public class TestCheckpointing extends BspCase {
}
}
+ private long runOriginalJob(Path checkpointsDir, GiraphConfiguration conf) throws IOException, InterruptedException, ClassNotFoundException {
+ Path outputPath = getTempPath("checkpointing_original");
+ conf.setComputationClass(
+ CheckpointComputation.class);
+ conf.setWorkerContextClass(
+ CheckpointVertexWorkerContext.class);
+ conf.setMasterComputeClass(
+ CheckpointVertexMasterCompute.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class);
+ conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
+ conf.set("mapred.job.id", TEST_JOB_ID);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
+
+ GiraphConfiguration configuration = job.getConfiguration();
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString());
+
+ assertTrue(job.run(true));
+
+ long idSum = 0;
+ if (!runningInDistributedMode()) {
+ FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
+ outputPath);
+ idSum = CheckpointVertexWorkerContext
+ .getFinalSum();
+ LOG.info("testBspCheckpoint: idSum = " + idSum +
+ " fileLen = " + fileStatus.getLen());
+ }
+ return idSum;
+ }
+
/**
* Actual computation.
@@ -159,10 +192,6 @@ public class TestCheckpointing extends BspCase {
CheckpointVertexWorkerContext workerContext = getWorkerContext();
assertEquals(getSuperstep() + 1, workerContext.testValue);
- if (getSuperstep() < getConf().getInt(KEY_MIN_SUPERSTEP, Integer.MAX_VALUE)){
- fail("Should not be running compute on superstep " + getSuperstep());
- }
-
if (getSuperstep() > 4) {
vertex.voteToHalt();
return;
@@ -186,10 +215,76 @@ public class TestCheckpointing extends BspCase {
EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
vertex.addEdge(newEdge);
sendMessage(edge.getTargetVertexId(), newEdgeValue);
+
}
}
}
+ @Test
+ public void testManualCheckpointAtTheBeginning()
+ throws InterruptedException, IOException, ClassNotFoundException {
+ testManualCheckpoint(0);
+ }
+
+ @Test
+ public void testManualCheckpoint()
+ throws InterruptedException, IOException, ClassNotFoundException {
+ testManualCheckpoint(2);
+ }
+
+
+ private void testManualCheckpoint(final int checkpointSuperstep)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Path checkpointsDir = getTempPath("checkpointing");
+ GiraphConfiguration conf = new GiraphConfiguration();
+
+ SUPERSTEP_CALLBACK = new SuperstepCallback() {
+
+ @Override
+ public void superstep(long superstep, ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
+ if (superstep == checkpointSuperstep) {
+ try {
+ ZooKeeperExt zooKeeperExt = new ZooKeeperExt(conf.getZookeeperList(),
+ conf.getZooKeeperSessionTimeout(),
+ conf.getZookeeperOpsMaxAttempts(),
+ conf.getZookeeperOpsRetryWaitMsecs(),
+ TestCheckpointing.this);
+ String basePath = ZooKeeperManager.getBasePath(conf) + BspService.BASE_DIR + "/" + conf.get("mapred.job.id");
+ zooKeeperExt.createExt(
+ basePath + BspService.FORCE_CHECKPOINT_USER_FLAG,
+ null,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (IOException | InterruptedException | KeeperException e) {
+ throw new RuntimeException(e);
+ }
+ } else if (superstep > checkpointSuperstep) {
+ Assert.fail("Job should be stopped by now " + superstep);
+ }
+ }
+ };
+
+ try {
+ runOriginalJob(checkpointsDir, conf);
+ fail("Original job should fail after checkpointing");
+ } catch (Exception e) {
+ LOG.info("Original job failed, that's OK " + e);
+ }
+
+ SUPERSTEP_CALLBACK = new SuperstepCallback() {
+ @Override
+ public void superstep(long superstep,
+ ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) {
+ if (superstep < checkpointSuperstep) {
+ Assert.fail("Restarted JOB should not be executed on superstep " + superstep);
+ }
+ }
+ };
+
+ runRestartedJob(checkpointsDir, conf, 10, -1);
+ }
+
/**
* Worker context associated.
*/
@@ -205,6 +300,21 @@ public class TestCheckpointing extends BspCase {
}
@Override
+ public void postSuperstep() {
+ super.postSuperstep();
+ sendMessageToMyself(new LongWritable(getSuperstep()));
+ }
+
+ /**
+ * Send message to all workers (except this worker)
+ *
+ * @param message Message to send
+ */
+ private void sendMessageToMyself(Writable message) {
+ sendMessageToWorker(message, getMyWorkerIndex());
+ }
+
+ @Override
public void postApplication() {
setFinalSum(this.<LongWritable>getAggregatedValue(
LongSumAggregator.class.getName()).get());
@@ -223,6 +333,11 @@ public class TestCheckpointing extends BspCase {
@Override
public void preSuperstep() {
assertEquals(getSuperstep(), testValue++);
+ if (getSuperstep() > 0) {
+ List<Writable> messages = getAndClearMessagesFromOtherWorkers();
+ assertEquals(1, messages.size());
+ assertEquals(getSuperstep() - 1, ((LongWritable)(messages.get(0))).get());
+ }
}
@Override
@@ -249,6 +364,9 @@ public class TestCheckpointing extends BspCase {
@Override
public void compute() {
long superstep = getSuperstep();
+ if (SUPERSTEP_CALLBACK != null) {
+ SUPERSTEP_CALLBACK.superstep(getSuperstep(), getConf());
+ }
assertEquals(superstep, testValue++);
}
@@ -272,6 +390,12 @@ public class TestCheckpointing extends BspCase {
}
}
+ private static interface SuperstepCallback {
+ public void superstep(long superstep,
+ ImmutableClassesGiraphConfiguration<LongWritable,
+ IntWritable, FloatWritable> conf);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 672ec44..b4d78ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1059,7 +1059,7 @@ under the License.
</modules>
<properties>
<hadoop.version>0.20.0</hadoop.version>
- <munge.symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</munge.symbols>
+ <munge.symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_JOB_ID_AVAILABLE</munge.symbols>
</properties>
<dependencies>
<!-- sorted lexicographically -->