You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [8/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Thu Feb 16 22:12:31 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.graph;
import net.iharder.Base64;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
-import org.apache.giraph.bsp.CentralizedService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.SuperstepState;
import org.apache.giraph.graph.GraphMapper.MapFunctions;
@@ -75,1663 +74,1678 @@ import org.apache.giraph.utils.WritableU
/**
* ZooKeeper-based implementation of {@link CentralizedService}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public class BspServiceMaster<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable, M extends Writable>
- extends BspService<I, V, E, M>
- implements CentralizedServiceMaster<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
- /** Superstep counter */
- private Counter superstepCounter = null;
- /** Vertex counter */
- private Counter vertexCounter = null;
- /** Finished vertex counter */
- private Counter finishedVertexCounter = null;
- /** Edge counter */
- private Counter edgeCounter = null;
- /** Sent messages counter */
- private Counter sentMessagesCounter = null;
- /** Workers on this superstep */
- private Counter currentWorkersCounter = null;
- /** Current master task partition */
- private Counter currentMasterTaskPartitionCounter = null;
- /** Last checkpointed superstep */
- private Counter lastCheckpointedSuperstepCounter = null;
- /** Am I the master? */
- private boolean isMaster = false;
- /** Max number of workers */
- private final int maxWorkers;
- /** Min number of workers */
- private final int minWorkers;
- /** Min % responded workers */
- private final float minPercentResponded;
- /** Poll period in msecs */
- private final int msecsPollPeriod;
- /** Max number of poll attempts */
- private final int maxPollAttempts;
- /** Min number of long tails before printing */
- private final int partitionLongTailMinPrint;
- /** Last finalized checkpoint */
- private long lastCheckpointedSuperstep = -1;
- /** State of the superstep changed */
- private final BspEvent superstepStateChanged =
- new PredicateLock();
- /** Master graph partitioner */
- private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
- /** All the partition stats from the last superstep */
- private final List<PartitionStats> allPartitionStatsList =
- new ArrayList<PartitionStats>();
- /** Counter group name for the Giraph statistics */
- public String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
- /** Aggregator writer */
- public AggregatorWriter aggregatorWriter;
-
- public BspServiceMaster(
- String serverPortList,
- int sessionMsecTimeout,
- Mapper<?, ?, ?, ?>.Context context,
- GraphMapper<I, V, E, M> graphMapper) {
- super(serverPortList, sessionMsecTimeout, context, graphMapper);
- registerBspEvent(superstepStateChanged);
-
- maxWorkers =
- getConfiguration().getInt(GiraphJob.MAX_WORKERS, -1);
- minWorkers =
- getConfiguration().getInt(GiraphJob.MIN_WORKERS, -1);
- minPercentResponded =
- getConfiguration().getFloat(GiraphJob.MIN_PERCENT_RESPONDED,
- 100.0f);
- msecsPollPeriod =
- getConfiguration().getInt(GiraphJob.POLL_MSECS,
- GiraphJob.POLL_MSECS_DEFAULT);
- maxPollAttempts =
- getConfiguration().getInt(GiraphJob.POLL_ATTEMPTS,
- GiraphJob.POLL_ATTEMPTS_DEFAULT);
- partitionLongTailMinPrint = getConfiguration().getInt(
- GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT,
- GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
- masterGraphPartitioner =
- getGraphPartitionerFactory().createMasterGraphPartitioner();
- }
-
- @Override
- public void setJobState(ApplicationState state,
- long applicationAttempt,
- long desiredSuperstep) {
- JSONObject jobState = new JSONObject();
- try {
- jobState.put(JSONOBJ_STATE_KEY, state.toString());
- jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
- jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
- } catch (JSONException e) {
- throw new RuntimeException("setJobState: Coudn't put " +
- state.toString());
- }
+public class BspServiceMaster<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends BspService<I, V, E, M>
+ implements CentralizedServiceMaster<I, V, E, M> {
+ /** Counter group name for the Giraph statistics */
+ public static final String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
+ /** Superstep counter */
+ private Counter superstepCounter = null;
+ /** Vertex counter */
+ private Counter vertexCounter = null;
+ /** Finished vertex counter */
+ private Counter finishedVertexCounter = null;
+ /** Edge counter */
+ private Counter edgeCounter = null;
+ /** Sent messages counter */
+ private Counter sentMessagesCounter = null;
+ /** Workers on this superstep */
+ private Counter currentWorkersCounter = null;
+ /** Current master task partition */
+ private Counter currentMasterTaskPartitionCounter = null;
+ /** Last checkpointed superstep */
+ private Counter lastCheckpointedSuperstepCounter = null;
+ /** Am I the master? */
+ private boolean isMaster = false;
+ /** Max number of workers */
+ private final int maxWorkers;
+ /** Min number of workers */
+ private final int minWorkers;
+ /** Min % responded workers */
+ private final float minPercentResponded;
+ /** Poll period in msecs */
+ private final int msecsPollPeriod;
+ /** Max number of poll attempts */
+ private final int maxPollAttempts;
+ /** Min number of long tails before printing */
+ private final int partitionLongTailMinPrint;
+ /** Last finalized checkpoint */
+ private long lastCheckpointedSuperstep = -1;
+ /** State of the superstep changed */
+ private final BspEvent superstepStateChanged =
+ new PredicateLock();
+ /** Master graph partitioner */
+ private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
+ /** All the partition stats from the last superstep */
+ private final List<PartitionStats> allPartitionStatsList =
+ new ArrayList<PartitionStats>();
+ /** Aggregator writer */
+ private AggregatorWriter aggregatorWriter;
+
+ /**
+ * Constructor for setting up the master.
+ *
+ * @param serverPortList ZooKeeper server port list
+ * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
+ * @param context Mapper context
+ * @param graphMapper Graph mapper
+ */
+ public BspServiceMaster(
+ String serverPortList,
+ int sessionMsecTimeout,
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphMapper<I, V, E, M> graphMapper) {
+ super(serverPortList, sessionMsecTimeout, context, graphMapper);
+ registerBspEvent(superstepStateChanged);
+
+ maxWorkers =
+ getConfiguration().getInt(GiraphJob.MAX_WORKERS, -1);
+ minWorkers =
+ getConfiguration().getInt(GiraphJob.MIN_WORKERS, -1);
+ minPercentResponded =
+ getConfiguration().getFloat(GiraphJob.MIN_PERCENT_RESPONDED,
+ 100.0f);
+ msecsPollPeriod =
+ getConfiguration().getInt(GiraphJob.POLL_MSECS,
+ GiraphJob.POLL_MSECS_DEFAULT);
+ maxPollAttempts =
+ getConfiguration().getInt(GiraphJob.POLL_ATTEMPTS,
+ GiraphJob.POLL_ATTEMPTS_DEFAULT);
+ partitionLongTailMinPrint = getConfiguration().getInt(
+ GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT,
+ GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
+ masterGraphPartitioner =
+ getGraphPartitionerFactory().createMasterGraphPartitioner();
+ }
+
+ @Override
+ public void setJobState(ApplicationState state,
+ long applicationAttempt,
+ long desiredSuperstep) {
+ JSONObject jobState = new JSONObject();
+ try {
+ jobState.put(JSONOBJ_STATE_KEY, state.toString());
+ jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
+ jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
+ } catch (JSONException e) {
+ throw new RuntimeException("setJobState: Coudn't put " +
+ state.toString());
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setJobState: " + jobState.toString() + " on superstep " +
+ getSuperstep());
+ }
+ try {
+ getZkExt().createExt(masterJobStatePath + "/jobState",
+ jobState.toString().getBytes(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ throw new IllegalStateException(
+ "setJobState: Imposible that " +
+ masterJobStatePath + " already exists!", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setJobState: Unknown KeeperException for " +
+ masterJobStatePath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setJobState: Unknown InterruptedException for " +
+ masterJobStatePath, e);
+ }
+
+ if (state == ApplicationState.FAILED) {
+ failJob();
+ }
+ }
+
+ /**
+ * Master uses this to calculate the {@link VertexInputFormat}
+ * input splits and write it to ZooKeeper.
+ *
+ * @param numWorkers Number of available workers
+ * @return List of input splits
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private List<InputSplit> generateInputSplits(int numWorkers) {
+ VertexInputFormat<I, V, E, M> vertexInputFormat =
+ BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
+ List<InputSplit> splits;
+ try {
+ splits = vertexInputFormat.getSplits(getContext(), numWorkers);
+ float samplePercent =
+ getConfiguration().getFloat(
+ GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT,
+ GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
+ if (samplePercent != GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+ int lastIndex = (int) (samplePercent * splits.size() / 100f);
+ List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
+ LOG.warn("generateInputSplits: Using sampling - Processing " +
+ "only " + sampleSplits.size() + " instead of " +
+ splits.size() + " expected splits.");
+ return sampleSplits;
+ } else {
if (LOG.isInfoEnabled()) {
- LOG.info("setJobState: " + jobState.toString() + " on superstep " +
- getSuperstep());
+ LOG.info("generateInputSplits: Got " + splits.size() +
+ " input splits for " + numWorkers + " workers");
}
- try {
- getZkExt().createExt(MASTER_JOB_STATE_PATH + "/jobState",
- jobState.toString().getBytes(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL,
- true);
- } catch (KeeperException.NodeExistsException e) {
- throw new IllegalStateException(
- "setJobState: Imposible that " +
- MASTER_JOB_STATE_PATH + " already exists!", e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "setJobState: Unknown KeeperException for " +
- MASTER_JOB_STATE_PATH, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "setJobState: Unknown InterruptedException for " +
- MASTER_JOB_STATE_PATH, e);
- }
-
- if (state == ApplicationState.FAILED) {
- failJob();
- }
- }
-
- /**
- * Master uses this to calculate the {@link VertexInputFormat}
- * input splits and write it to ZooKeeper.
- *
- * @param numWorkers Number of available workers
- * @throws InstantiationException
- * @throws IllegalAccessException
- * @throws IOException
- * @throws InterruptedException
- */
- private List<InputSplit> generateInputSplits(int numWorkers) {
- VertexInputFormat<I, V, E, M> vertexInputFormat =
- BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
- List<InputSplit> splits;
- try {
- splits = vertexInputFormat.getSplits(getContext(), numWorkers);
- float samplePercent =
- getConfiguration().getFloat(
- GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT,
- GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
- if (samplePercent != GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
- int lastIndex = (int) (samplePercent * splits.size() / 100f);
- List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
- LOG.warn("generateInputSplits: Using sampling - Processing " +
- "only " + sampleSplits.size() + " instead of " +
- splits.size() + " expected splits.");
- return sampleSplits;
+ return splits;
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "generateInputSplits: Got IOException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "generateInputSplits: Got InterruptedException", e);
+ }
+ }
+
+ /**
+ * When there is no salvaging this job, fail it.
+ *
+ * @throws IOException
+ */
+ private void failJob() {
+ LOG.fatal("failJob: Killing job " + getJobId());
+ try {
+ @SuppressWarnings("deprecation")
+ org.apache.hadoop.mapred.JobClient jobClient =
+ new org.apache.hadoop.mapred.JobClient(
+ (org.apache.hadoop.mapred.JobConf)
+ getConfiguration());
+ @SuppressWarnings("deprecation")
+ org.apache.hadoop.mapred.JobID jobId =
+ org.apache.hadoop.mapred.JobID.forName(getJobId());
+ RunningJob job = jobClient.getJob(jobId);
+ job.killJob();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Parse the {@link WorkerInfo} objects from a ZooKeeper path
+ * (and children).
+ *
+ * @param workerInfosPath Path where all the workers are children
+ * @param watch Watch or not?
+ * @return List of workers in that path
+ */
+ private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
+ boolean watch) {
+ List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
+ List<String> workerInfoPathList;
+ try {
+ workerInfoPathList =
+ getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "getWorkers: Got KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "getWorkers: Got InterruptedStateException", e);
+ }
+ for (String workerInfoPath : workerInfoPathList) {
+ WorkerInfo workerInfo = new WorkerInfo();
+ WritableUtils.readFieldsFromZnode(
+ getZkExt(), workerInfoPath, true, null, workerInfo);
+ workerInfoList.add(workerInfo);
+ }
+ return workerInfoList;
+ }
+
+ /**
+ * Get the healthy and unhealthy {@link WorkerInfo} objects for
+ * a superstep
+ *
+ * @param superstep superstep to check
+ * @param healthyWorkerInfoList filled in with current data
+ * @param unhealthyWorkerInfoList filled in with current data
+ */
+ private void getAllWorkerInfos(
+ long superstep,
+ List<WorkerInfo> healthyWorkerInfoList,
+ List<WorkerInfo> unhealthyWorkerInfoList) {
+ String healthyWorkerInfoPath =
+ getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
+ String unhealthyWorkerInfoPath =
+ getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
+
+ try {
+ getZkExt().createOnceExt(healthyWorkerInfoPath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("getWorkers: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("getWorkers: IllegalStateException"
+ , e);
+ }
+
+ try {
+ getZkExt().createOnceExt(unhealthyWorkerInfoPath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("getWorkers: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("getWorkers: IllegalStateException"
+ , e);
+ }
+
+ List<WorkerInfo> currentHealthyWorkerInfoList =
+ getWorkerInfosFromPath(healthyWorkerInfoPath, true);
+ List<WorkerInfo> currentUnhealthyWorkerInfoList =
+ getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
+
+ healthyWorkerInfoList.clear();
+ if (currentHealthyWorkerInfoList != null) {
+ for (WorkerInfo healthyWorkerInfo :
+ currentHealthyWorkerInfoList) {
+ healthyWorkerInfoList.add(healthyWorkerInfo);
+ }
+ }
+
+ unhealthyWorkerInfoList.clear();
+ if (currentUnhealthyWorkerInfoList != null) {
+ for (WorkerInfo unhealthyWorkerInfo :
+ currentUnhealthyWorkerInfoList) {
+ unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
+ }
+ }
+ }
+
+ /**
+ * Check all the {@link WorkerInfo} objects to ensure that a minimum
+ * number of good workers exists out of the total that have reported.
+ *
+ * @return List of of healthy workers such that the minimum has been
+ * met, otherwise null
+ */
+ private List<WorkerInfo> checkWorkers() {
+ boolean failJob = true;
+ int pollAttempt = 0;
+ List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
+ List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
+ int totalResponses = -1;
+ while (pollAttempt < maxPollAttempts) {
+ getAllWorkerInfos(
+ getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
+ totalResponses = healthyWorkerInfoList.size() +
+ unhealthyWorkerInfoList.size();
+ if ((totalResponses * 100.0f / maxWorkers) >=
+ minPercentResponded) {
+ failJob = false;
+ break;
+ }
+ getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
+ "checkWorkers: Only found " +
+ totalResponses +
+ " responses of " + maxWorkers +
+ " needed to start superstep " +
+ getSuperstep());
+ if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
+ msecsPollPeriod)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("checkWorkers: Got event that health " +
+ "registration changed, not using poll attempt");
+ }
+ getWorkerHealthRegistrationChangedEvent().reset();
+ continue;
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("checkWorkers: Only found " + totalResponses +
+ " responses of " + maxWorkers +
+ " needed to start superstep " +
+ getSuperstep() + ". Sleeping for " +
+ msecsPollPeriod + " msecs and used " + pollAttempt +
+ " of " + maxPollAttempts + " attempts.");
+ // Find the missing workers if there are only a few
+ if ((maxWorkers - totalResponses) <=
+ partitionLongTailMinPrint) {
+ Set<Integer> partitionSet = new TreeSet<Integer>();
+ for (WorkerInfo workerInfo : healthyWorkerInfoList) {
+ partitionSet.add(workerInfo.getPartitionId());
+ }
+ for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
+ partitionSet.add(workerInfo.getPartitionId());
+ }
+ for (int i = 1; i <= maxWorkers; ++i) {
+ if (partitionSet.contains(new Integer(i))) {
+ continue;
+ } else if (i == getTaskPartition()) {
+ continue;
} else {
- if (LOG.isInfoEnabled()) {
- LOG.info("generateInputSplits: Got " + splits.size() +
- " input splits for " + numWorkers + " workers");
- }
- return splits;
- }
- } catch (IOException e) {
- throw new IllegalStateException(
- "generateInputSplits: Got IOException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "generateInputSplits: Got InterruptedException", e);
- }
- }
-
- /**
- * When there is no salvaging this job, fail it.
- *
- * @throws IOException
- */
- private void failJob() {
- LOG.fatal("failJob: Killing job " + getJobId());
- try {
- @SuppressWarnings("deprecation")
- org.apache.hadoop.mapred.JobClient jobClient =
- new org.apache.hadoop.mapred.JobClient(
- (org.apache.hadoop.mapred.JobConf)
- getConfiguration());
- @SuppressWarnings("deprecation")
- org.apache.hadoop.mapred.JobID jobId =
- org.apache.hadoop.mapred.JobID.forName(getJobId());
- RunningJob job = jobClient.getJob(jobId);
- job.killJob();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Parse the {@link WorkerInfo} objects from a ZooKeeper path
- * (and children).
- *
- * @param workerInfosPath Path where all the workers are children
- * @param watch Watch or not?
- * @return List of workers in that path
- */
- private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
- boolean watch) {
- List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
- List<String> workerInfoPathList;
- try {
- workerInfoPathList =
- getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "getWorkers: Got KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "getWorkers: Got InterruptedStateException", e);
- }
- for (String workerInfoPath : workerInfoPathList) {
- WorkerInfo workerInfo = new WorkerInfo();
- WritableUtils.readFieldsFromZnode(
- getZkExt(), workerInfoPath, true, null, workerInfo);
- workerInfoList.add(workerInfo);
- }
- return workerInfoList;
- }
-
- /**
- * Get the healthy and unhealthy {@link WorkerInfo} objects for
- * a superstep
- *
- * @param superstep superstep to check
- * @param healthyWorkerInfoList filled in with current data
- * @param unhealthyWorkerInfoList filled in with current data
- */
- private void getAllWorkerInfos(
- long superstep,
- List<WorkerInfo> healthyWorkerInfoList,
- List<WorkerInfo> unhealthyWorkerInfoList) {
- String healthyWorkerInfoPath =
- getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
- String unhealthyWorkerInfoPath =
- getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
-
- try {
- getZkExt().createOnceExt(healthyWorkerInfoPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException("getWorkers: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("getWorkers: IllegalStateException"
- , e);
- }
-
- try {
- getZkExt().createOnceExt(unhealthyWorkerInfoPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException("getWorkers: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("getWorkers: IllegalStateException"
- , e);
- }
-
- List<WorkerInfo> currentHealthyWorkerInfoList =
- getWorkerInfosFromPath(healthyWorkerInfoPath, true);
- List<WorkerInfo> currentUnhealthyWorkerInfoList =
- getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
-
- healthyWorkerInfoList.clear();
- if (currentHealthyWorkerInfoList != null) {
- for (WorkerInfo healthyWorkerInfo :
- currentHealthyWorkerInfoList) {
- healthyWorkerInfoList.add(healthyWorkerInfo);
- }
- }
-
- unhealthyWorkerInfoList.clear();
- if (currentUnhealthyWorkerInfoList != null) {
- for (WorkerInfo unhealthyWorkerInfo :
- currentUnhealthyWorkerInfoList) {
- unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
- }
- }
- }
-
- /**
- * Check all the {@link WorkerInfo} objects to ensure that a minimum
- * number of good workers exists out of the total that have reported.
- *
- * @return List of of healthy workers such that the minimum has been
- * met, otherwise null
- */
- private List<WorkerInfo> checkWorkers() {
- boolean failJob = true;
- int pollAttempt = 0;
- List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
- List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
- int totalResponses = -1;
- while (pollAttempt < maxPollAttempts) {
- getAllWorkerInfos(
- getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
- totalResponses = healthyWorkerInfoList.size() +
- unhealthyWorkerInfoList.size();
- if ((totalResponses * 100.0f / maxWorkers) >=
- minPercentResponded) {
- failJob = false;
- break;
- }
- getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
- "checkWorkers: Only found " +
- totalResponses +
- " responses of " + maxWorkers +
- " needed to start superstep " +
- getSuperstep());
- if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
- msecsPollPeriod)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("checkWorkers: Got event that health " +
- "registration changed, not using poll attempt");
- }
- getWorkerHealthRegistrationChangedEvent().reset();
- continue;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("checkWorkers: Only found " + totalResponses +
- " responses of " + maxWorkers +
- " needed to start superstep " +
- getSuperstep() + ". Sleeping for " +
- msecsPollPeriod + " msecs and used " + pollAttempt +
- " of " + maxPollAttempts + " attempts.");
- // Find the missing workers if there are only a few
- if ((maxWorkers - totalResponses) <=
- partitionLongTailMinPrint) {
- Set<Integer> partitionSet = new TreeSet<Integer>();
- for (WorkerInfo workerInfo : healthyWorkerInfoList) {
- partitionSet.add(workerInfo.getPartitionId());
- }
- for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
- partitionSet.add(workerInfo.getPartitionId());
- }
- for (int i = 1; i <= maxWorkers; ++i) {
- if (partitionSet.contains(new Integer(i))) {
- continue;
- } else if (i == getTaskPartition()) {
- continue;
- } else {
- LOG.info("checkWorkers: No response from "+
- "partition " + i + " (could be master)");
- }
- }
- }
- }
- ++pollAttempt;
- }
- if (failJob) {
- LOG.error("checkWorkers: Did not receive enough processes in " +
- "time (only " + totalResponses + " of " +
- minWorkers + " required). This occurs if you do not " +
- "have enough map tasks available simultaneously on " +
- "your Hadoop instance to fulfill the number of " +
- "requested workers.");
- return null;
- }
-
- if (healthyWorkerInfoList.size() < minWorkers) {
- LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
- " available when " + minWorkers + " are required.");
- return null;
- }
-
- getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
- "checkWorkers: Done - Found " + totalResponses +
- " responses of " + maxWorkers + " needed to start superstep " +
- getSuperstep());
-
- return healthyWorkerInfoList;
- }
-
- @Override
- public int createInputSplits() {
- // Only the 'master' should be doing this. Wait until the number of
- // processes that have reported health exceeds the minimum percentage.
- // If the minimum percentage is not met, fail the job. Otherwise
- // generate the input splits
- try {
- if (getZkExt().exists(INPUT_SPLIT_PATH, false) != null) {
- LOG.info(INPUT_SPLIT_PATH +
- " already exists, no need to create");
- return Integer.parseInt(
- new String(
- getZkExt().getData(INPUT_SPLIT_PATH, false, null)));
- }
- } catch (KeeperException.NoNodeException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("createInputSplits: Need to create the " +
- "input splits at " + INPUT_SPLIT_PATH);
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "createInputSplits: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "createInputSplits: IllegalStateException", e);
- }
-
- // When creating znodes, in case the master has already run, resume
- // where it left off.
- List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
- if (healthyWorkerInfoList == null) {
- setJobState(ApplicationState.FAILED, -1, -1);
- return -1;
- }
-
- // Note that the input splits may only be a sample if
- // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
- List<InputSplit> splitList =
- generateInputSplits(healthyWorkerInfoList.size());
- if (healthyWorkerInfoList.size() > splitList.size()) {
- LOG.warn("createInputSplits: Number of inputSplits="
- + splitList.size() + " < " +
- healthyWorkerInfoList.size() +
- "=number of healthy processes, " +
- "some workers will be not used");
- }
- String inputSplitPath = null;
- for (int i = 0; i< splitList.size(); ++i) {
- try {
- ByteArrayOutputStream byteArrayOutputStream =
- new ByteArrayOutputStream();
- DataOutput outputStream =
- new DataOutputStream(byteArrayOutputStream);
- InputSplit inputSplit = splitList.get(i);
- Text.writeString(outputStream,
- inputSplit.getClass().getName());
- ((Writable) inputSplit).write(outputStream);
- inputSplitPath = INPUT_SPLIT_PATH + "/" + i;
- getZkExt().createExt(inputSplitPath,
- byteArrayOutputStream.toByteArray(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("createInputSplits: Created input split " +
- "with index " + i + " serialized as " +
- byteArrayOutputStream.toString());
- }
- } catch (KeeperException.NodeExistsException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("createInputSplits: Node " +
- inputSplitPath + " already exists.");
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "createInputSplits: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "createInputSplits: IllegalStateException", e);
- } catch (IOException e) {
- throw new IllegalStateException(
- "createInputSplits: IOException", e);
- }
- }
-
- // Let workers know they can start trying to load the input splits
- try {
- getZkExt().create(INPUT_SPLITS_ALL_READY_PATH,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException e) {
- LOG.info("createInputSplits: Node " +
- INPUT_SPLITS_ALL_READY_PATH + " already exists.");
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "createInputSplits: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "createInputSplits: IllegalStateException", e);
- }
-
- return splitList.size();
- }
-
- /**
- * Read the finalized checkpoint file and associated metadata files for the
- * checkpoint. Modifies the {@link PartitionOwner} objects to get the
- * checkpoint prefixes. It is an optimization to prevent all workers from
- * searching all the files. Also read in the aggregator data from the
- * finalized checkpoint file and setting it.
- *
- * @param superstep Checkpoint set to examine.
- * @param partitionOwners Partition owners to modify with checkpoint
- * prefixes
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
- */
- private void prepareCheckpointRestart(
- long superstep,
- Collection<PartitionOwner> partitionOwners)
- throws IOException, KeeperException, InterruptedException {
- FileSystem fs = getFs();
- List<Path> validMetadataPathList = new ArrayList<Path>();
- String finalizedCheckpointPath =
- getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
- DataInputStream finalizedStream =
- fs.open(new Path(finalizedCheckpointPath));
- int prefixFileCount = finalizedStream.readInt();
- for (int i = 0; i < prefixFileCount; ++i) {
- String metadataFilePath =
- finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX;
- validMetadataPathList.add(new Path(metadataFilePath));
- }
-
- // Set the merged aggregator data if it exists.
- int aggregatorDataSize = finalizedStream.readInt();
- if (aggregatorDataSize > 0) {
- byte [] aggregatorZkData = new byte[aggregatorDataSize];
- int actualDataRead =
- finalizedStream.read(aggregatorZkData, 0, aggregatorDataSize);
- if (actualDataRead != aggregatorDataSize) {
- throw new RuntimeException(
- "prepareCheckpointRestart: Only read " + actualDataRead +
- " of " + aggregatorDataSize + " aggregator bytes from " +
- finalizedCheckpointPath);
- }
- String mergedAggregatorPath =
- getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
- if (LOG.isInfoEnabled()) {
- LOG.info("prepareCheckpointRestart: Reloading merged " +
- "aggregator " + "data '" +
- Arrays.toString(aggregatorZkData) +
- "' to previous checkpoint in path " +
- mergedAggregatorPath);
- }
- if (getZkExt().exists(mergedAggregatorPath, false) == null) {
- getZkExt().createExt(mergedAggregatorPath,
- aggregatorZkData,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- }
- else {
- getZkExt().setData(mergedAggregatorPath, aggregatorZkData, -1);
- }
- }
- finalizedStream.close();
-
- Map<Integer, PartitionOwner> idOwnerMap =
- new HashMap<Integer, PartitionOwner>();
- for (PartitionOwner partitionOwner : partitionOwners) {
- if (idOwnerMap.put(partitionOwner.getPartitionId(),
- partitionOwner) != null) {
- throw new IllegalStateException(
- "prepareCheckpointRestart: Duplicate partition " +
- partitionOwner);
+ LOG.info("checkWorkers: No response from " +
+ "partition " + i + " (could be master)");
}
+ }
}
- // Reading the metadata files. Simply assign each partition owner
- // the correct file prefix based on the partition id.
- for (Path metadataPath : validMetadataPathList) {
- String checkpointFilePrefix = metadataPath.toString();
- checkpointFilePrefix =
- checkpointFilePrefix.substring(
- 0,
- checkpointFilePrefix.length() -
- CHECKPOINT_METADATA_POSTFIX.length());
- DataInputStream metadataStream = fs.open(metadataPath);
- long partitions = metadataStream.readInt();
- for (long i = 0; i < partitions; ++i) {
- long dataPos = metadataStream.readLong();
- int partitionId = metadataStream.readInt();
- PartitionOwner partitionOwner = idOwnerMap.get(partitionId);
- if (LOG.isInfoEnabled()) {
- LOG.info("prepareSuperstepRestart: File " + metadataPath +
- " with position " + dataPos +
- ", partition id = " + partitionId +
- " assigned to " + partitionOwner);
- }
- partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix);
- }
- metadataStream.close();
- }
+ }
+ ++pollAttempt;
}
-
- @Override
- public void setup() {
- // Might have to manually load a checkpoint.
- // In that case, the input splits are not set, they will be faked by
- // the checkpoint files. Each checkpoint file will be an input split
- // and the input split
- superstepCounter = getContext().getCounter(
- GIRAPH_STATS_COUNTER_GROUP_NAME, "Superstep");
- vertexCounter = getContext().getCounter(
- GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate vertices");
- finishedVertexCounter = getContext().getCounter(
- GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate finished vertices");
- edgeCounter = getContext().getCounter(
- GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate edges");
- sentMessagesCounter = getContext().getCounter(
- GIRAPH_STATS_COUNTER_GROUP_NAME, "Sent messages");
- currentWorkersCounter = getContext().getCounter(
- GIRAPH_STATS_COUNTER_GROUP_NAME, "Current workers");
- currentMasterTaskPartitionCounter = getContext().getCounter(
- GIRAPH_STATS_COUNTER_GROUP_NAME, "Current master task partition");
- lastCheckpointedSuperstepCounter = getContext().getCounter(
- GIRAPH_STATS_COUNTER_GROUP_NAME, "Last checkpointed superstep");
- if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
- superstepCounter.increment(getRestartedSuperstep());
+ if (failJob) {
+ LOG.error("checkWorkers: Did not receive enough processes in " +
+ "time (only " + totalResponses + " of " +
+ minWorkers + " required). This occurs if you do not " +
+ "have enough map tasks available simultaneously on " +
+ "your Hadoop instance to fulfill the number of " +
+ "requested workers.");
+ return null;
+ }
+
+ if (healthyWorkerInfoList.size() < minWorkers) {
+ LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
+ " available when " + minWorkers + " are required.");
+ return null;
+ }
+
+ getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
+ "checkWorkers: Done - Found " + totalResponses +
+ " responses of " + maxWorkers + " needed to start superstep " +
+ getSuperstep());
+
+ return healthyWorkerInfoList;
+ }
+
+ @Override
+ public int createInputSplits() {
+ // Only the 'master' should be doing this. Wait until the number of
+ // processes that have reported health exceeds the minimum percentage.
+ // If the minimum percentage is not met, fail the job. Otherwise
+ // generate the input splits
+ try {
+ if (getZkExt().exists(inputSplitsPath, false) != null) {
+ LOG.info(inputSplitsPath +
+ " already exists, no need to create");
+ return Integer.parseInt(
+ new String(
+ getZkExt().getData(inputSplitsPath, false, null)));
+ }
+ } catch (KeeperException.NoNodeException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("createInputSplits: Need to create the " +
+ "input splits at " + inputSplitsPath);
+ }
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "createInputSplits: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "createInputSplits: InterrtupedException", e);
+ }
+
+ // When creating znodes, in case the master has already run, resume
+ // where it left off.
+ List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
+ if (healthyWorkerInfoList == null) {
+ setJobState(ApplicationState.FAILED, -1, -1);
+ return -1;
+ }
+
+ // Note that the input splits may only be a sample if
+ // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
+ List<InputSplit> splitList =
+ generateInputSplits(healthyWorkerInfoList.size());
+ if (healthyWorkerInfoList.size() > splitList.size()) {
+ LOG.warn("createInputSplits: Number of inputSplits=" +
+ splitList.size() + " < " +
+ healthyWorkerInfoList.size() +
+ "=number of healthy processes, " +
+ "some workers will be not used");
+ }
+ String inputSplitPath = null;
+ for (int i = 0; i < splitList.size(); ++i) {
+ try {
+ ByteArrayOutputStream byteArrayOutputStream =
+ new ByteArrayOutputStream();
+ DataOutput outputStream =
+ new DataOutputStream(byteArrayOutputStream);
+ InputSplit inputSplit = splitList.get(i);
+ Text.writeString(outputStream,
+ inputSplit.getClass().getName());
+ ((Writable) inputSplit).write(outputStream);
+ inputSplitPath = inputSplitsPath + "/" + i;
+ getZkExt().createExt(inputSplitPath,
+ byteArrayOutputStream.toByteArray(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("createInputSplits: Created input split " +
+ "with index " + i + " serialized as " +
+ byteArrayOutputStream.toString());
}
- }
-
- @Override
- public boolean becomeMaster() {
- // Create my bid to become the master, then try to become the worker
- // or return false.
- String myBid = null;
- try {
- myBid =
- getZkExt().createExt(MASTER_ELECTION_PATH +
- "/" + getHostnamePartitionId(),
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "becomeMaster: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "becomeMaster: IllegalStateException", e);
- }
- while (true) {
- JSONObject jobState = getJobState();
- try {
- if ((jobState != null) &&
- ApplicationState.valueOf(
- jobState.getString(JSONOBJ_STATE_KEY)) ==
- ApplicationState.FINISHED) {
- LOG.info("becomeMaster: Job is finished, " +
- "give up trying to be the master!");
- isMaster = false;
- return isMaster;
- }
- } catch (JSONException e) {
- throw new IllegalStateException(
- "becomeMaster: Couldn't get state from " + jobState, e);
- }
- try {
- List<String> masterChildArr =
- getZkExt().getChildrenExt(
- MASTER_ELECTION_PATH, true, true, true);
- if (LOG.isInfoEnabled()) {
- LOG.info("becomeMaster: First child is '" +
- masterChildArr.get(0) + "' and my bid is '" +
- myBid + "'");
- }
- if (masterChildArr.get(0).equals(myBid)) {
- currentMasterTaskPartitionCounter.increment(
- getTaskPartition() -
- currentMasterTaskPartitionCounter.getValue());
- aggregatorWriter =
- BspUtils.createAggregatorWriter(getConfiguration());
- try {
- aggregatorWriter.initialize(getContext(),
- getApplicationAttempt());
- } catch (IOException e) {
- throw new IllegalStateException("becomeMaster: " +
- "Couldn't initialize aggregatorWriter", e);
- }
- LOG.info("becomeMaster: I am now the master!");
- isMaster = true;
- return isMaster;
- }
- LOG.info("becomeMaster: Waiting to become the master...");
- getMasterElectionChildrenChangedEvent().waitForever();
- getMasterElectionChildrenChangedEvent().reset();
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "becomeMaster: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "becomeMaster: IllegalStateException", e);
- }
- }
- }
-
- /**
- * Collect and aggregate the worker statistics for a particular superstep.
- *
- * @param superstep Superstep to aggregate on
- * @return Global statistics aggregated on all worker statistics
- */
- private GlobalStats aggregateWorkerStats(long superstep) {
- Class<? extends Writable> partitionStatsClass =
- masterGraphPartitioner.createPartitionStats().getClass();
- GlobalStats globalStats = new GlobalStats();
- // Get the stats from the all the worker selected nodes
- String workerFinishedPath =
- getWorkerFinishedPath(getApplicationAttempt(), superstep);
- List<String> workerFinishedPathList = null;
- try {
- workerFinishedPathList =
- getZkExt().getChildrenExt(
- workerFinishedPath, false, false, true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: InterruptedException", e);
- }
-
- allPartitionStatsList.clear();
- for (String finishedPath : workerFinishedPathList) {
- JSONObject workerFinishedInfoObj = null;
- try {
- byte [] zkData =
- getZkExt().getData(finishedPath, false, null);
- workerFinishedInfoObj = new JSONObject(new String(zkData));
- List<? extends Writable> writableList =
- WritableUtils.readListFieldsFromByteArray(
- Base64.decode(workerFinishedInfoObj.getString(
- JSONOBJ_PARTITION_STATS_KEY)),
- partitionStatsClass,
- getConfiguration());
- for (Writable writable : writableList) {
- globalStats.addPartitionStats((PartitionStats) writable);
- globalStats.addMessageCount(
- workerFinishedInfoObj.getLong(
- JSONOBJ_NUM_MESSAGES_KEY));
- allPartitionStatsList.add((PartitionStats) writable);
- }
- } catch (JSONException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: JSONException", e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: InterruptedException", e);
- } catch (IOException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: IOException", e);
- }
- }
-
+ } catch (KeeperException.NodeExistsException e) {
if (LOG.isInfoEnabled()) {
- LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
- " on superstep = " + getSuperstep());
- }
- return globalStats;
- }
-
- /**
- * Get the aggregator values for a particular superstep,
- * aggregate and save them. Does nothing on the INPUT_SUPERSTEP.
- *
- * @param superstep superstep to check
- */
- private void collectAndProcessAggregatorValues(long superstep) {
- if (superstep == INPUT_SUPERSTEP) {
- // Nothing to collect on the input superstep
- return;
- }
- Map<String, Aggregator<? extends Writable>> aggregatorMap =
- new TreeMap<String, Aggregator<? extends Writable>>();
- String workerFinishedPath =
- getWorkerFinishedPath(getApplicationAttempt(), superstep);
- List<String> hostnameIdPathList = null;
- try {
- hostnameIdPathList =
- getZkExt().getChildrenExt(
- workerFinishedPath, false, false, true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: InterruptedException", e);
- }
-
- for (String hostnameIdPath : hostnameIdPathList) {
- JSONObject workerFinishedInfoObj = null;
- JSONArray aggregatorArray = null;
- try {
- byte [] zkData =
- getZkExt().getData(hostnameIdPath, false, null);
- workerFinishedInfoObj = new JSONObject(new String(zkData));
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: InterruptedException",
- e);
- } catch (JSONException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: JSONException", e);
- }
- try {
- aggregatorArray = workerFinishedInfoObj.getJSONArray(
- JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY);
- } catch (JSONException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("collectAndProcessAggregatorValues: " +
- "No aggregators" + " for " + hostnameIdPath);
- }
- continue;
- }
- for (int i = 0; i < aggregatorArray.length(); ++i) {
- try {
- if (LOG.isInfoEnabled()) {
- LOG.info("collectAndProcessAggregatorValues: " +
- "Getting aggregators from " +
- aggregatorArray.getJSONObject(i));
- }
- String aggregatorName =
- aggregatorArray.getJSONObject(i).getString(
- AGGREGATOR_NAME_KEY);
- String aggregatorClassName =
- aggregatorArray.getJSONObject(i).getString(
- AGGREGATOR_CLASS_NAME_KEY);
- @SuppressWarnings("unchecked")
- Aggregator<Writable> aggregator =
- (Aggregator<Writable>) aggregatorMap.get(aggregatorName);
- boolean firstTime = false;
- if (aggregator == null) {
- @SuppressWarnings("unchecked")
- Aggregator<Writable> aggregatorWritable =
- (Aggregator<Writable>) getAggregator(aggregatorName);
- aggregator = aggregatorWritable;
- if (aggregator == null) {
- @SuppressWarnings("unchecked")
- Class<? extends Aggregator<Writable>> aggregatorClass =
- (Class<? extends Aggregator<Writable>>)
- Class.forName(aggregatorClassName);
- aggregator = registerAggregator(
- aggregatorName,
- aggregatorClass);
- }
- aggregatorMap.put(aggregatorName, aggregator);
- firstTime = true;
- }
- Writable aggregatorValue =
- aggregator.createAggregatedValue();
- InputStream input =
- new ByteArrayInputStream(
- Base64.decode(
- aggregatorArray.getJSONObject(i).
- getString(AGGREGATOR_VALUE_KEY)));
- aggregatorValue.readFields(new DataInputStream(input));
- if (LOG.isDebugEnabled()) {
- LOG.debug("collectAndProcessAggregatorValues: " +
- "aggregator value size=" + input.available() +
- " for aggregator=" + aggregatorName +
- " value=" + aggregatorValue);
- }
- if (firstTime) {
- aggregator.setAggregatedValue(aggregatorValue);
- } else {
- aggregator.aggregate(aggregatorValue);
- }
- } catch (IOException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "IOException when reading aggregator data " +
- aggregatorArray, e);
- } catch (JSONException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "JSONException when reading aggregator data " +
- aggregatorArray, e);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "ClassNotFoundException when reading aggregator data " +
- aggregatorArray, e);
- } catch (InstantiationException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "InstantiationException when reading aggregator data " +
- aggregatorArray, e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "IOException when reading aggregator data " +
- aggregatorArray, e);
- }
- }
+ LOG.info("createInputSplits: Node " +
+ inputSplitPath + " already exists.");
}
- if (aggregatorMap.size() > 0) {
- String mergedAggregatorPath =
- getMergedAggregatorPath(getApplicationAttempt(), superstep);
- byte [] zkData = null;
- JSONArray aggregatorArray = new JSONArray();
- for (Map.Entry<String, Aggregator<? extends Writable>> entry :
- aggregatorMap.entrySet()) {
- try {
- ByteArrayOutputStream outputStream =
- new ByteArrayOutputStream();
- DataOutput output = new DataOutputStream(outputStream);
- entry.getValue().getAggregatedValue().write(output);
-
- JSONObject aggregatorObj = new JSONObject();
- aggregatorObj.put(AGGREGATOR_NAME_KEY,
- entry.getKey());
- aggregatorObj.put(
- AGGREGATOR_VALUE_KEY,
- Base64.encodeBytes(outputStream.toByteArray()));
- aggregatorArray.put(aggregatorObj);
- if (LOG.isInfoEnabled()) {
- LOG.info("collectAndProcessAggregatorValues: " +
- "Trying to add aggregatorObj " +
- aggregatorObj + "(" +
- entry.getValue().getAggregatedValue() +
- ") to merged aggregator path " +
- mergedAggregatorPath);
- }
- } catch (IOException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
- "IllegalStateException", e);
- } catch (JSONException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: JSONException", e);
- }
- }
- try {
- zkData = aggregatorArray.toString().getBytes();
- getZkExt().createExt(mergedAggregatorPath,
- zkData,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException.NodeExistsException e) {
- LOG.warn("collectAndProcessAggregatorValues: " +
- mergedAggregatorPath+
- " already exists!");
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "collectAndProcessAggregatorValues: IllegalStateException",
- e);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("collectAndProcessAggregatorValues: Finished " +
- "loading " +
- mergedAggregatorPath+ " with aggregator values " +
- aggregatorArray);
- }
- }
- }
-
- /**
- * Finalize the checkpoint file prefixes by taking the chosen workers and
- * writing them to a finalized file. Also write out the master
- * aggregated aggregator array from the previous superstep.
- *
- * @param superstep superstep to finalize
- * @param chosenWorkerList list of chosen workers that will be finalized
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
- */
- private void finalizeCheckpoint(
- long superstep,
- List<WorkerInfo> chosenWorkerInfoList)
- throws IOException, KeeperException, InterruptedException {
- Path finalizedCheckpointPath =
- new Path(getCheckpointBasePath(superstep) +
- CHECKPOINT_FINALIZED_POSTFIX);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "createInputSplits: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "createInputSplits: IllegalStateException", e);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "createInputSplits: IOException", e);
+ }
+ }
+
+ // Let workers know they can start trying to load the input splits
+ try {
+ getZkExt().create(inputSplitsAllReadyPath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.info("createInputSplits: Node " +
+ inputSplitsAllReadyPath + " already exists.");
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "createInputSplits: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "createInputSplits: IllegalStateException", e);
+ }
+
+ return splitList.size();
+ }
+
+ /**
+ * Read the finalized checkpoint file and associated metadata files for the
+ * checkpoint. Modifies the {@link PartitionOwner} objects to get the
+ * checkpoint prefixes. It is an optimization to prevent all workers from
+ * searching all the files. Also read in the aggregator data from the
+ * finalized checkpoint file and setting it.
+ *
+ * @param superstep Checkpoint set to examine.
+ * @param partitionOwners Partition owners to modify with checkpoint
+ * prefixes
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ private void prepareCheckpointRestart(
+ long superstep,
+ Collection<PartitionOwner> partitionOwners)
+ throws IOException, KeeperException, InterruptedException {
+ FileSystem fs = getFs();
+ List<Path> validMetadataPathList = new ArrayList<Path>();
+ String finalizedCheckpointPath =
+ getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+ DataInputStream finalizedStream =
+ fs.open(new Path(finalizedCheckpointPath));
+ int prefixFileCount = finalizedStream.readInt();
+ for (int i = 0; i < prefixFileCount; ++i) {
+ String metadataFilePath =
+ finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX;
+ validMetadataPathList.add(new Path(metadataFilePath));
+ }
+
+ // Set the merged aggregator data if it exists.
+ int aggregatorDataSize = finalizedStream.readInt();
+ if (aggregatorDataSize > 0) {
+ byte [] aggregatorZkData = new byte[aggregatorDataSize];
+ int actualDataRead =
+ finalizedStream.read(aggregatorZkData, 0, aggregatorDataSize);
+ if (actualDataRead != aggregatorDataSize) {
+ throw new RuntimeException(
+ "prepareCheckpointRestart: Only read " + actualDataRead +
+ " of " + aggregatorDataSize + " aggregator bytes from " +
+ finalizedCheckpointPath);
+ }
+ String mergedAggregatorPath =
+ getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("prepareCheckpointRestart: Reloading merged " +
+ "aggregator " + "data '" +
+ Arrays.toString(aggregatorZkData) +
+ "' to previous checkpoint in path " +
+ mergedAggregatorPath);
+ }
+ if (getZkExt().exists(mergedAggregatorPath, false) == null) {
+ getZkExt().createExt(mergedAggregatorPath,
+ aggregatorZkData,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } else {
+ getZkExt().setData(mergedAggregatorPath, aggregatorZkData, -1);
+ }
+ }
+ finalizedStream.close();
+
+ Map<Integer, PartitionOwner> idOwnerMap =
+ new HashMap<Integer, PartitionOwner>();
+ for (PartitionOwner partitionOwner : partitionOwners) {
+ if (idOwnerMap.put(partitionOwner.getPartitionId(),
+ partitionOwner) != null) {
+ throw new IllegalStateException(
+ "prepareCheckpointRestart: Duplicate partition " +
+ partitionOwner);
+ }
+ }
+ // Reading the metadata files. Simply assign each partition owner
+ // the correct file prefix based on the partition id.
+ for (Path metadataPath : validMetadataPathList) {
+ String checkpointFilePrefix = metadataPath.toString();
+ checkpointFilePrefix =
+ checkpointFilePrefix.substring(
+ 0,
+ checkpointFilePrefix.length() -
+ CHECKPOINT_METADATA_POSTFIX.length());
+ DataInputStream metadataStream = fs.open(metadataPath);
+ long partitions = metadataStream.readInt();
+ for (long i = 0; i < partitions; ++i) {
+ long dataPos = metadataStream.readLong();
+ int partitionId = metadataStream.readInt();
+ PartitionOwner partitionOwner = idOwnerMap.get(partitionId);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("prepareSuperstepRestart: File " + metadataPath +
+ " with position " + dataPos +
+ ", partition id = " + partitionId +
+ " assigned to " + partitionOwner);
+ }
+ partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix);
+ }
+ metadataStream.close();
+ }
+ }
+
+ @Override
+ public void setup() {
+ // Might have to manually load a checkpoint.
+ // In that case, the input splits are not set, they will be faked by
+ // the checkpoint files. Each checkpoint file will be an input split
+ // and the input split
+ superstepCounter = getContext().getCounter(
+ GIRAPH_STATS_COUNTER_GROUP_NAME, "Superstep");
+ vertexCounter = getContext().getCounter(
+ GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate vertices");
+ finishedVertexCounter = getContext().getCounter(
+ GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate finished vertices");
+ edgeCounter = getContext().getCounter(
+ GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate edges");
+ sentMessagesCounter = getContext().getCounter(
+ GIRAPH_STATS_COUNTER_GROUP_NAME, "Sent messages");
+ currentWorkersCounter = getContext().getCounter(
+ GIRAPH_STATS_COUNTER_GROUP_NAME, "Current workers");
+ currentMasterTaskPartitionCounter = getContext().getCounter(
+ GIRAPH_STATS_COUNTER_GROUP_NAME, "Current master task partition");
+ lastCheckpointedSuperstepCounter = getContext().getCounter(
+ GIRAPH_STATS_COUNTER_GROUP_NAME, "Last checkpointed superstep");
+ if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+ superstepCounter.increment(getRestartedSuperstep());
+ }
+ }
+
+ @Override
+ public boolean becomeMaster() {
+ // Create my bid to become the master, then try to become the worker
+ // or return false.
+ String myBid = null;
+ try {
+ myBid =
+ getZkExt().createExt(masterElectionPath +
+ "/" + getHostnamePartitionId(),
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL_SEQUENTIAL,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "becomeMaster: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "becomeMaster: IllegalStateException", e);
+ }
+ while (true) {
+ JSONObject jobState = getJobState();
+ try {
+ if ((jobState != null) &&
+ ApplicationState.valueOf(
+ jobState.getString(JSONOBJ_STATE_KEY)) ==
+ ApplicationState.FINISHED) {
+ LOG.info("becomeMaster: Job is finished, " +
+ "give up trying to be the master!");
+ isMaster = false;
+ return isMaster;
+ }
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ "becomeMaster: Couldn't get state from " + jobState, e);
+ }
+ try {
+ List<String> masterChildArr =
+ getZkExt().getChildrenExt(
+ masterElectionPath, true, true, true);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("becomeMaster: First child is '" +
+ masterChildArr.get(0) + "' and my bid is '" +
+ myBid + "'");
+ }
+ if (masterChildArr.get(0).equals(myBid)) {
+ currentMasterTaskPartitionCounter.increment(
+ getTaskPartition() -
+ currentMasterTaskPartitionCounter.getValue());
+ aggregatorWriter =
+ BspUtils.createAggregatorWriter(getConfiguration());
+ try {
+ aggregatorWriter.initialize(getContext(),
+ getApplicationAttempt());
+ } catch (IOException e) {
+ throw new IllegalStateException("becomeMaster: " +
+ "Couldn't initialize aggregatorWriter", e);
+ }
+ LOG.info("becomeMaster: I am now the master!");
+ isMaster = true;
+ return isMaster;
+ }
+ LOG.info("becomeMaster: Waiting to become the master...");
+ getMasterElectionChildrenChangedEvent().waitForever();
+ getMasterElectionChildrenChangedEvent().reset();
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "becomeMaster: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "becomeMaster: IllegalStateException", e);
+ }
+ }
+ }
+
+ /**
+ * Collect and aggregate the worker statistics for a particular superstep.
+ *
+ * @param superstep Superstep to aggregate on
+ * @return Global statistics aggregated on all worker statistics
+ */
+ private GlobalStats aggregateWorkerStats(long superstep) {
+ Class<? extends Writable> partitionStatsClass =
+ masterGraphPartitioner.createPartitionStats().getClass();
+ GlobalStats globalStats = new GlobalStats();
+ // Get the stats from the all the worker selected nodes
+ String workerFinishedPath =
+ getWorkerFinishedPath(getApplicationAttempt(), superstep);
+ List<String> workerFinishedPathList = null;
+ try {
+ workerFinishedPathList =
+ getZkExt().getChildrenExt(
+ workerFinishedPath, false, false, true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "aggregateWorkerStats: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "aggregateWorkerStats: InterruptedException", e);
+ }
+
+ allPartitionStatsList.clear();
+ for (String finishedPath : workerFinishedPathList) {
+ JSONObject workerFinishedInfoObj = null;
+ try {
+ byte [] zkData =
+ getZkExt().getData(finishedPath, false, null);
+ workerFinishedInfoObj = new JSONObject(new String(zkData));
+ List<? extends Writable> writableList =
+ WritableUtils.readListFieldsFromByteArray(
+ Base64.decode(workerFinishedInfoObj.getString(
+ JSONOBJ_PARTITION_STATS_KEY)),
+ partitionStatsClass,
+ getConfiguration());
+ for (Writable writable : writableList) {
+ globalStats.addPartitionStats((PartitionStats) writable);
+ globalStats.addMessageCount(
+ workerFinishedInfoObj.getLong(
+ JSONOBJ_NUM_MESSAGES_KEY));
+ allPartitionStatsList.add((PartitionStats) writable);
+ }
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ "aggregateWorkerStats: JSONException", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "aggregateWorkerStats: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "aggregateWorkerStats: InterruptedException", e);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "aggregateWorkerStats: IOException", e);
+ }
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
+ " on superstep = " + getSuperstep());
+ }
+ return globalStats;
+ }
+
+ /**
+ * Get the aggregator values for a particular superstep,
+ * aggregate and save them. Does nothing on the INPUT_SUPERSTEP.
+ *
+ * @param superstep superstep to check
+ */
+ private void collectAndProcessAggregatorValues(long superstep) {
+ if (superstep == INPUT_SUPERSTEP) {
+ // Nothing to collect on the input superstep
+ return;
+ }
+ Map<String, Aggregator<? extends Writable>> aggregatorMap =
+ new TreeMap<String, Aggregator<? extends Writable>>();
+ String workerFinishedPath =
+ getWorkerFinishedPath(getApplicationAttempt(), superstep);
+ List<String> hostnameIdPathList = null;
+ try {
+ hostnameIdPathList =
+ getZkExt().getChildrenExt(
+ workerFinishedPath, false, false, true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: InterruptedException", e);
+ }
+
+ for (String hostnameIdPath : hostnameIdPathList) {
+ JSONObject workerFinishedInfoObj = null;
+ JSONArray aggregatorArray = null;
+ try {
+ byte [] zkData =
+ getZkExt().getData(hostnameIdPath, false, null);
+ workerFinishedInfoObj = new JSONObject(new String(zkData));
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: InterruptedException",
+ e);
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: JSONException", e);
+ }
+ try {
+ aggregatorArray = workerFinishedInfoObj.getJSONArray(
+ JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY);
+ } catch (JSONException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("collectAndProcessAggregatorValues: " +
+ "No aggregators" + " for " + hostnameIdPath);
+ }
+ continue;
+ }
+ for (int i = 0; i < aggregatorArray.length(); ++i) {
try {
- getFs().delete(finalizedCheckpointPath, false);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("collectAndProcessAggregatorValues: " +
+ "Getting aggregators from " +
+ aggregatorArray.getJSONObject(i));
+ }
+ String aggregatorName =
+ aggregatorArray.getJSONObject(i).getString(
+ AGGREGATOR_NAME_KEY);
+ String aggregatorClassName =
+ aggregatorArray.getJSONObject(i).getString(
+ AGGREGATOR_CLASS_NAME_KEY);
+ @SuppressWarnings("unchecked")
+ Aggregator<Writable> aggregator =
+ (Aggregator<Writable>) aggregatorMap.get(aggregatorName);
+ boolean firstTime = false;
+ if (aggregator == null) {
+ @SuppressWarnings("unchecked")
+ Aggregator<Writable> aggregatorWritable =
+ (Aggregator<Writable>) getAggregator(aggregatorName);
+ aggregator = aggregatorWritable;
+ if (aggregator == null) {
+ @SuppressWarnings("unchecked")
+ Class<? extends Aggregator<Writable>> aggregatorClass =
+ (Class<? extends Aggregator<Writable>>)
+ Class.forName(aggregatorClassName);
+ aggregator = registerAggregator(
+ aggregatorName,
+ aggregatorClass);
+ }
+ aggregatorMap.put(aggregatorName, aggregator);
+ firstTime = true;
+ }
+ Writable aggregatorValue =
+ aggregator.createAggregatedValue();
+ InputStream input =
+ new ByteArrayInputStream(
+ Base64.decode(
+ aggregatorArray.getJSONObject(i).
+ getString(AGGREGATOR_VALUE_KEY)));
+ aggregatorValue.readFields(new DataInputStream(input));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("collectAndProcessAggregatorValues: " +
+ "aggregator value size=" + input.available() +
+ " for aggregator=" + aggregatorName +
+ " value=" + aggregatorValue);
+ }
+ if (firstTime) {
+ aggregator.setAggregatedValue(aggregatorValue);
+ } else {
+ aggregator.aggregate(aggregatorValue);
+ }
} catch (IOException e) {
- LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
- finalizedCheckpointPath);
- }
-
- // Format:
- // <number of files>
- // <used file prefix 0><used file prefix 1>...
- // <aggregator data length><aggregators as a serialized JSON byte array>
- FSDataOutputStream finalizedOutputStream =
- getFs().create(finalizedCheckpointPath);
- finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
- for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
- String chosenWorkerInfoPrefix =
- getCheckpointBasePath(superstep) + "." +
- chosenWorkerInfo.getHostnameId();
- finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix);
- }
- String mergedAggregatorPath =
- getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
- if (getZkExt().exists(mergedAggregatorPath, false) != null) {
- byte [] aggregatorZkData =
- getZkExt().getData(mergedAggregatorPath, false, null);
- finalizedOutputStream.writeInt(aggregatorZkData.length);
- finalizedOutputStream.write(aggregatorZkData);
- }
- else {
- finalizedOutputStream.writeInt(0);
- }
- finalizedOutputStream.close();
- lastCheckpointedSuperstep = superstep;
- lastCheckpointedSuperstepCounter.increment(superstep -
- lastCheckpointedSuperstepCounter.getValue());
- }
-
- /**
- * Assign the partitions for this superstep. If there are changes,
- * the workers will know how to do the exchange. If this was a restarted
- * superstep, then make sure to provide information on where to find the
- * checkpoint file.
- *
- * @param allPartitionStatsList All partition stats
- * @param chosenWorkerInfoList All the chosen worker infos
- * @param masterGraphPartitioner Master graph partitioner
- */
- private void assignPartitionOwners(
- List<PartitionStats> allPartitionStatsList,
- List<WorkerInfo> chosenWorkerInfoList,
- MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner) {
- Collection<PartitionOwner> partitionOwners;
- if (getSuperstep() == INPUT_SUPERSTEP ||
- getSuperstep() == getRestartedSuperstep()) {
- partitionOwners =
- masterGraphPartitioner.createInitialPartitionOwners(
- chosenWorkerInfoList, maxWorkers);
- if (partitionOwners.isEmpty()) {
- throw new IllegalStateException(
- "assignAndExchangePartitions: No partition owners set");
- }
- } else {
- partitionOwners =
- masterGraphPartitioner.generateChangedPartitionOwners(
- allPartitionStatsList,
- chosenWorkerInfoList,
- maxWorkers,
- getSuperstep());
-
- PartitionUtils.analyzePartitionStats(partitionOwners,
- allPartitionStatsList);
- }
-
- // If restarted, prepare the checkpoint restart
- if (getRestartedSuperstep() == getSuperstep()) {
- try {
- prepareCheckpointRestart(getSuperstep(), partitionOwners);
- } catch (IOException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: IOException on preparing", e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: KeeperException on preparing", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: InteruptedException on preparing",
- e);
- }
- }
-
- // There will be some exchange of partitions
- if (!partitionOwners.isEmpty()) {
- String vertexExchangePath =
- getPartitionExchangePath(getApplicationAttempt(),
- getSuperstep());
- try {
- getZkExt().createOnceExt(vertexExchangePath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: KeeperException creating " +
- vertexExchangePath);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: InterruptedException creating " +
- vertexExchangePath);
- }
- }
-
- // Workers are waiting for these assignments
- String partitionAssignmentsPath =
- getPartitionAssignmentsPath(getApplicationAttempt(),
- getSuperstep());
- WritableUtils.writeListToZnode(
- getZkExt(),
- partitionAssignmentsPath,
- -1,
- new ArrayList<Writable>(partitionOwners));
- }
-
- /**
- * Check whether the workers chosen for this superstep are still alive
- *
- * @param chosenWorkerHealthPath Path to the healthy workers in ZooKeeper
- * @param chosenWorkerList List of the healthy workers
- * @return true if they are all alive, false otherwise.
- * @throws InterruptedException
- * @throws KeeperException
- */
- private boolean superstepChosenWorkerAlive(
- String chosenWorkerInfoHealthPath,
- List<WorkerInfo> chosenWorkerInfoList)
- throws KeeperException, InterruptedException {
- List<WorkerInfo> chosenWorkerInfoHealthyList =
- getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
- Set<WorkerInfo> chosenWorkerInfoHealthySet =
- new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
- boolean allChosenWorkersHealthy = true;
- for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
- if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
- allChosenWorkersHealthy = false;
- LOG.error("superstepChosenWorkerAlive: Missing chosen " +
- "worker " + chosenWorkerInfo +
- " on superstep " + getSuperstep());
- }
- }
- return allChosenWorkersHealthy;
- }
-
- @Override
- public void restartFromCheckpoint(long checkpoint) {
- // Process:
- // 1. Remove all old input split data
- // 2. Increase the application attempt and set to the correct checkpoint
- // 3. Send command to all workers to restart their tasks
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: " +
+ "IOException when reading aggregator data " +
+ aggregatorArray, e);
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: " +
+ "JSONException when reading aggregator data " +
+ aggregatorArray, e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: " +
+ "ClassNotFoundException when reading aggregator data " +
+ aggregatorArray, e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: " +
+ "InstantiationException when reading aggregator data " +
+ aggregatorArray, e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException(
+ "collectAndProcessAggregatorValues: " +
+ "IOException when reading aggregator data " +
+ aggregatorArray, e);
+ }
+ }
+ }
+ if (aggregatorMap.size() > 0) {
+ String mergedAggregatorPath =
+ getMergedAggregatorPath(getApplicationAttempt(), superstep);
+ byte [] zkData = null;
[... 1279 lines stripped ...]