You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC
[19/23] GIRAPH-409: Refactor / cleanups (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
deleted file mode 100644
index 85a1da8..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
+++ /dev/null
@@ -1,1884 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.giraph.bsp.ApplicationState;
-import org.apache.giraph.bsp.BspInputFormat;
-import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.comm.MasterServer;
-import org.apache.giraph.comm.netty.NettyMasterClient;
-import org.apache.giraph.comm.netty.NettyMasterServer;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.counters.GiraphStats;
-import org.apache.giraph.graph.partition.MasterGraphPartitioner;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.graph.partition.PartitionStats;
-import org.apache.giraph.graph.partition.PartitionUtils;
-import org.apache.giraph.master.MasterObserver;
-import org.apache.giraph.metrics.AggregatedMetrics;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.GiraphTimer;
-import org.apache.giraph.metrics.GiraphTimerContext;
-import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
-import org.apache.giraph.metrics.SuperstepMetricsRegistry;
-import org.apache.giraph.metrics.WorkerSuperstepMetrics;
-import org.apache.giraph.utils.ProgressableUtils;
-import org.apache.giraph.utils.SystemTime;
-import org.apache.giraph.utils.Time;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import net.iharder.Base64;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/**
- * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class BspServiceMaster<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends BspService<I, V, E, M>
- implements CentralizedServiceMaster<I, V, E, M>,
- ResetSuperstepMetricsObserver {
- /** Print worker names only if there are 10 workers left */
- public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
- /** How many threads to use when writing input splits to zookeeper*/
- public static final String INPUT_SPLIT_THREAD_COUNT =
- "giraph.inputSplitThreadCount";
- /** Default number of threads to use when writing input splits to zookeeper */
- public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
- /** Time instance to use for timing */
- private static final Time TIME = SystemTime.get();
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
- /** Am I the master? */
- private boolean isMaster = false;
- /** Max number of workers */
- private final int maxWorkers;
- /** Min number of workers */
- private final int minWorkers;
- /** Min % responded workers */
- private final float minPercentResponded;
- /** Msecs to wait for an event */
- private final int eventWaitMsecs;
- /** Max msecs to wait for a superstep to get enough workers */
- private final int maxSuperstepWaitMsecs;
- /** Min number of long tails before printing */
- private final int partitionLongTailMinPrint;
- /** Last finalized checkpoint */
- private long lastCheckpointedSuperstep = -1;
- /** Worker wrote checkpoint */
- private final BspEvent workerWroteCheckpoint;
- /** State of the superstep changed */
- private final BspEvent superstepStateChanged;
- /** Master graph partitioner */
- private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
- /** All the partition stats from the last superstep */
- private final List<PartitionStats> allPartitionStatsList =
- new ArrayList<PartitionStats>();
- /** Handler for aggregators */
- private MasterAggregatorHandler aggregatorHandler;
- /** Master class */
- private MasterCompute masterCompute;
- /** IPC Client */
- private MasterClient masterClient;
- /** IPC Server */
- private MasterServer masterServer;
- /** Master info */
- private MasterInfo masterInfo;
- /** List of workers in current superstep */
- private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
- /** Limit locality information added to each InputSplit znode */
- private final int localityLimit = 5;
- /** Observers over master lifecycle. */
- private final MasterObserver[] observers;
-
- // Per-Superstep Metrics
- /** MasterCompute time */
- private GiraphTimer masterComputeTimer;
-
- /**
- * Constructor for setting up the master.
- *
- * @param serverPortList ZooKeeper server port list
- * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
- * @param context Mapper context
- * @param graphMapper Graph mapper
- */
- public BspServiceMaster(
- String serverPortList,
- int sessionMsecTimeout,
- Mapper<?, ?, ?, ?>.Context context,
- GraphMapper<I, V, E, M> graphMapper) {
- super(serverPortList, sessionMsecTimeout, context, graphMapper);
- workerWroteCheckpoint = new PredicateLock(context);
- registerBspEvent(workerWroteCheckpoint);
- superstepStateChanged = new PredicateLock(context);
- registerBspEvent(superstepStateChanged);
-
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration();
-
- maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, -1);
- minWorkers = conf.getInt(GiraphConstants.MIN_WORKERS, -1);
- minPercentResponded = conf.getFloat(
- GiraphConstants.MIN_PERCENT_RESPONDED, 100.0f);
- eventWaitMsecs = conf.getEventWaitMsecs();
- maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
- partitionLongTailMinPrint = conf.getInt(
- GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT,
- GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
- masterGraphPartitioner =
- getGraphPartitionerFactory().createMasterGraphPartitioner();
- observers = getConfiguration().createMasterObservers();
-
- GiraphMetrics.get().addSuperstepResetObserver(this);
- GiraphStats.init(context);
- }
-
- @Override
- public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
- masterComputeTimer = new GiraphTimer(superstepMetrics,
- "master-compute-call", TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void setJobState(ApplicationState state,
- long applicationAttempt,
- long desiredSuperstep) {
- JSONObject jobState = new JSONObject();
- try {
- jobState.put(JSONOBJ_STATE_KEY, state.toString());
- jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt);
- jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep);
- } catch (JSONException e) {
- throw new RuntimeException("setJobState: Couldn't put " +
- state.toString());
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("setJobState: " + jobState.toString() + " on superstep " +
- getSuperstep());
- }
- try {
- getZkExt().createExt(masterJobStatePath + "/jobState",
- jobState.toString().getBytes(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL,
- true);
- } catch (KeeperException.NodeExistsException e) {
- throw new IllegalStateException(
- "setJobState: Imposible that " +
- masterJobStatePath + " already exists!", e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "setJobState: Unknown KeeperException for " +
- masterJobStatePath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "setJobState: Unknown InterruptedException for " +
- masterJobStatePath, e);
- }
-
- if (state == ApplicationState.FAILED) {
- failJob();
- }
- }
-
- /**
- * Common method for generating vertex/edge input splits.
- *
- * @param inputFormat The vertex/edge input format
- * @param numWorkers Number of available workers
- * @param inputSplitType Type of input splits (for logging purposes)
- * @return List of input splits for the given format
- */
- private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
- int numWorkers,
- String inputSplitType) {
- String logPrefix = "generate" + inputSplitType + "InputSplits";
- List<InputSplit> splits;
- try {
- splits = inputFormat.getSplits(getContext(), numWorkers);
- } catch (IOException e) {
- throw new IllegalStateException(logPrefix + ": Got IOException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- logPrefix + ": Got InterruptedException", e);
- }
- float samplePercent =
- getConfiguration().getFloat(
- GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT,
- GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
- if (samplePercent !=
- GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
- int lastIndex = (int) (samplePercent * splits.size() / 100f);
- List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
- LOG.warn(logPrefix + ": Using sampling - Processing only " +
- sampleSplits.size() + " instead of " + splits.size() +
- " expected splits.");
- return sampleSplits;
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Got " + splits.size() +
- " input splits for " + numWorkers + " workers");
- }
- return splits;
- }
- }
-
- /**
- * When there is no salvaging this job, fail it.
- */
- private void failJob() {
- LOG.fatal("failJob: Killing job " + getJobId());
- try {
- @SuppressWarnings("deprecation")
- org.apache.hadoop.mapred.JobClient jobClient =
- new org.apache.hadoop.mapred.JobClient(
- (org.apache.hadoop.mapred.JobConf)
- getContext().getConfiguration());
- @SuppressWarnings("deprecation")
- JobID jobId = JobID.forName(getJobId());
- RunningJob job = jobClient.getJob(jobId);
- failureCleanup(null);
- job.killJob();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Parse the {@link WorkerInfo} objects from a ZooKeeper path
- * (and children).
- *
- * @param workerInfosPath Path where all the workers are children
- * @param watch Watch or not?
- * @return List of workers in that path
- */
- private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
- boolean watch) {
- List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
- List<String> workerInfoPathList;
- try {
- workerInfoPathList =
- getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "getWorkers: Got KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "getWorkers: Got InterruptedStateException", e);
- }
- for (String workerInfoPath : workerInfoPathList) {
- WorkerInfo workerInfo = new WorkerInfo();
- WritableUtils.readFieldsFromZnode(
- getZkExt(), workerInfoPath, true, null, workerInfo);
- workerInfoList.add(workerInfo);
- }
- return workerInfoList;
- }
-
- /**
- * Get the healthy and unhealthy {@link WorkerInfo} objects for
- * a superstep
- *
- * @param superstep superstep to check
- * @param healthyWorkerInfoList filled in with current data
- * @param unhealthyWorkerInfoList filled in with current data
- */
- private void getAllWorkerInfos(
- long superstep,
- List<WorkerInfo> healthyWorkerInfoList,
- List<WorkerInfo> unhealthyWorkerInfoList) {
- String healthyWorkerInfoPath =
- getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
- String unhealthyWorkerInfoPath =
- getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
-
- try {
- getZkExt().createOnceExt(healthyWorkerInfoPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException("getWorkers: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("getWorkers: IllegalStateException", e);
- }
-
- try {
- getZkExt().createOnceExt(unhealthyWorkerInfoPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException("getWorkers: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("getWorkers: IllegalStateException", e);
- }
-
- List<WorkerInfo> currentHealthyWorkerInfoList =
- getWorkerInfosFromPath(healthyWorkerInfoPath, true);
- List<WorkerInfo> currentUnhealthyWorkerInfoList =
- getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
-
- healthyWorkerInfoList.clear();
- if (currentHealthyWorkerInfoList != null) {
- for (WorkerInfo healthyWorkerInfo :
- currentHealthyWorkerInfoList) {
- healthyWorkerInfoList.add(healthyWorkerInfo);
- }
- }
-
- unhealthyWorkerInfoList.clear();
- if (currentUnhealthyWorkerInfoList != null) {
- for (WorkerInfo unhealthyWorkerInfo :
- currentUnhealthyWorkerInfoList) {
- unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
- }
- }
- }
-
- /**
- * Check all the {@link WorkerInfo} objects to ensure that a minimum
- * number of good workers exists out of the total that have reported.
- *
- * @return List of of healthy workers such that the minimum has been
- * met, otherwise null
- */
- private List<WorkerInfo> checkWorkers() {
- boolean failJob = true;
- long failWorkerCheckMsecs =
- SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
- List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
- List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
- int totalResponses = -1;
- while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
- getContext().progress();
- getAllWorkerInfos(
- getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
- totalResponses = healthyWorkerInfoList.size() +
- unhealthyWorkerInfoList.size();
- if ((totalResponses * 100.0f / maxWorkers) >=
- minPercentResponded) {
- failJob = false;
- break;
- }
- getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
- "checkWorkers: Only found " +
- totalResponses +
- " responses of " + maxWorkers +
- " needed to start superstep " +
- getSuperstep());
- if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
- eventWaitMsecs)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("checkWorkers: Got event that health " +
- "registration changed, not using poll attempt");
- }
- getWorkerHealthRegistrationChangedEvent().reset();
- continue;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("checkWorkers: Only found " + totalResponses +
- " responses of " + maxWorkers +
- " needed to start superstep " +
- getSuperstep() + ". Reporting every" +
- eventWaitMsecs + " msecs, " +
- (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
- " more msecs left before giving up.");
- // Find the missing workers if there are only a few
- if ((maxWorkers - totalResponses) <=
- partitionLongTailMinPrint) {
- logMissingWorkersOnSuperstep(healthyWorkerInfoList,
- unhealthyWorkerInfoList);
- }
- }
- }
- if (failJob) {
- LOG.error("checkWorkers: Did not receive enough processes in " +
- "time (only " + totalResponses + " of " +
- minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
- "msecs). This occurs if you do not have enough map tasks " +
- "available simultaneously on your Hadoop instance to fulfill " +
- "the number of requested workers.");
- return null;
- }
-
- if (healthyWorkerInfoList.size() < minWorkers) {
- LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
- " available when " + minWorkers + " are required.");
- logMissingWorkersOnSuperstep(healthyWorkerInfoList,
- unhealthyWorkerInfoList);
- return null;
- }
-
- getContext().setStatus(getGraphMapper().getMapFunctions() + " " +
- "checkWorkers: Done - Found " + totalResponses +
- " responses of " + maxWorkers + " needed to start superstep " +
- getSuperstep());
-
- return healthyWorkerInfoList;
- }
-
- /**
- * Log info level of the missing workers on the superstep
- *
- * @param healthyWorkerInfoList Healthy worker list
- * @param unhealthyWorkerInfoList Unhealthy worker list
- */
- private void logMissingWorkersOnSuperstep(
- List<WorkerInfo> healthyWorkerInfoList,
- List<WorkerInfo> unhealthyWorkerInfoList) {
- if (LOG.isInfoEnabled()) {
- Set<Integer> partitionSet = new TreeSet<Integer>();
- for (WorkerInfo workerInfo : healthyWorkerInfoList) {
- partitionSet.add(workerInfo.getTaskId());
- }
- for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
- partitionSet.add(workerInfo.getTaskId());
- }
- for (int i = 1; i <= maxWorkers; ++i) {
- if (partitionSet.contains(Integer.valueOf(i))) {
- continue;
- } else if (i == getTaskPartition()) {
- continue;
- } else {
- LOG.info("logMissingWorkersOnSuperstep: No response from " +
- "partition " + i + " (could be master)");
- }
- }
- }
- }
-
- /**
- * Common method for creating vertex/edge input splits.
- *
- * @param inputFormat The vertex/edge input format
- * @param inputSplitPaths ZooKeeper input split paths
- * @param inputSplitType Type of input split (for logging purposes)
- * @return Number of splits. Returns -1 on failure to create
- * valid input splits.
- */
- private int createInputSplits(GiraphInputFormat inputFormat,
- InputSplitPaths inputSplitPaths,
- String inputSplitType) {
- String logPrefix = "create" + inputSplitType + "InputSplits";
- // Only the 'master' should be doing this. Wait until the number of
- // processes that have reported health exceeds the minimum percentage.
- // If the minimum percentage is not met, fail the job. Otherwise
- // generate the input splits
- String inputSplitsPath = inputSplitPaths.getPath();
- try {
- if (getZkExt().exists(inputSplitsPath, false) != null) {
- LOG.info(inputSplitsPath + " already exists, no need to create");
- return Integer.parseInt(
- new String(getZkExt().getData(inputSplitsPath, false, null)));
- }
- } catch (KeeperException.NoNodeException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Need to create the input splits at " +
- inputSplitsPath);
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(logPrefix + ": KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ": InterrtupedException", e);
- }
-
- // When creating znodes, in case the master has already run, resume
- // where it left off.
- List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
- if (healthyWorkerInfoList == null) {
- setJobState(ApplicationState.FAILED, -1, -1);
- return -1;
- }
-
- // Note that the input splits may only be a sample if
- // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
- List<InputSplit> splitList = generateInputSplits(inputFormat,
- healthyWorkerInfoList.size(), inputSplitType);
-
- if (splitList.isEmpty()) {
- LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
- "check input of " + inputFormat.getClass().getName() + "!");
- getContext().setStatus("Failing job due to 0 input splits, " +
- "check input of " + inputFormat.getClass().getName() + "!");
- failJob();
- }
- if (healthyWorkerInfoList.size() > splitList.size()) {
- LOG.warn(logPrefix + ": Number of inputSplits=" +
- splitList.size() + " < " +
- healthyWorkerInfoList.size() +
- "=number of healthy processes, " +
- "some workers will be not used");
- }
-
- // Write input splits to zookeeper in parallel
- int inputSplitThreadCount = getConfiguration().getInt(
- INPUT_SPLIT_THREAD_COUNT,
- DEFAULT_INPUT_SPLIT_THREAD_COUNT);
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Starting to write input split data " +
- "to zookeeper with " + inputSplitThreadCount + " threads");
- }
- ExecutorService taskExecutor =
- Executors.newFixedThreadPool(inputSplitThreadCount);
- for (int i = 0; i < splitList.size(); ++i) {
- InputSplit inputSplit = splitList.get(i);
- taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i));
- }
- taskExecutor.shutdown();
- ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Done writing input split data to zookeeper");
- }
-
- // Let workers know they can start trying to load the input splits
- try {
- getZkExt().createExt(inputSplitPaths.getAllReadyPath(),
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- false);
- } catch (KeeperException.NodeExistsException e) {
- LOG.info(logPrefix + ": Node " +
- inputSplitPaths.getAllReadyPath() + " already exists.");
- } catch (KeeperException e) {
- throw new IllegalStateException(logPrefix + ": KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
- }
-
- return splitList.size();
- }
-
- @Override
- public int createVertexInputSplits() {
- // Short-circuit if there is no vertex input format
- if (!getConfiguration().hasVertexInputFormat()) {
- return 0;
- }
- VertexInputFormat<I, V, E, M> vertexInputFormat =
- getConfiguration().createVertexInputFormat();
- return createInputSplits(vertexInputFormat, vertexInputSplitsPaths,
- "Vertex");
- }
-
- @Override
- public int createEdgeInputSplits() {
- // Short-circuit if there is no edge input format
- if (!getConfiguration().hasEdgeInputFormat()) {
- return 0;
- }
- EdgeInputFormat<I, E> edgeInputFormat =
- getConfiguration().createEdgeInputFormat();
- return createInputSplits(edgeInputFormat, edgeInputSplitsPaths,
- "Edge");
- }
-
- @Override
- public List<WorkerInfo> getWorkerInfoList() {
- return chosenWorkerInfoList;
- }
-
- @Override
- public MasterAggregatorHandler getAggregatorHandler() {
- return aggregatorHandler;
- }
-
- /**
- * Read the finalized checkpoint file and associated metadata files for the
- * checkpoint. Modifies the {@link PartitionOwner} objects to get the
- * checkpoint prefixes. It is an optimization to prevent all workers from
- * searching all the files. Also read in the aggregator data from the
- * finalized checkpoint file and setting it.
- *
- * @param superstep Checkpoint set to examine.
- * @param partitionOwners Partition owners to modify with checkpoint
- * prefixes
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
- */
- private void prepareCheckpointRestart(
- long superstep,
- Collection<PartitionOwner> partitionOwners)
- throws IOException, KeeperException, InterruptedException {
- FileSystem fs = getFs();
- List<Path> validMetadataPathList = new ArrayList<Path>();
- String finalizedCheckpointPath =
- getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
- DataInputStream finalizedStream =
- fs.open(new Path(finalizedCheckpointPath));
- GlobalStats globalStats = new GlobalStats();
- globalStats.readFields(finalizedStream);
- updateCounters(globalStats);
- int prefixFileCount = finalizedStream.readInt();
- for (int i = 0; i < prefixFileCount; ++i) {
- String metadataFilePath =
- finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX;
- validMetadataPathList.add(new Path(metadataFilePath));
- }
-
- aggregatorHandler.readFields(finalizedStream);
- masterCompute.readFields(finalizedStream);
- finalizedStream.close();
-
- Map<Integer, PartitionOwner> idOwnerMap =
- new HashMap<Integer, PartitionOwner>();
- for (PartitionOwner partitionOwner : partitionOwners) {
- if (idOwnerMap.put(partitionOwner.getPartitionId(),
- partitionOwner) != null) {
- throw new IllegalStateException(
- "prepareCheckpointRestart: Duplicate partition " +
- partitionOwner);
- }
- }
- // Reading the metadata files. Simply assign each partition owner
- // the correct file prefix based on the partition id.
- for (Path metadataPath : validMetadataPathList) {
- String checkpointFilePrefix = metadataPath.toString();
- checkpointFilePrefix =
- checkpointFilePrefix.substring(
- 0,
- checkpointFilePrefix.length() -
- CHECKPOINT_METADATA_POSTFIX.length());
- DataInputStream metadataStream = fs.open(metadataPath);
- long partitions = metadataStream.readInt();
- for (long i = 0; i < partitions; ++i) {
- long dataPos = metadataStream.readLong();
- int partitionId = metadataStream.readInt();
- PartitionOwner partitionOwner = idOwnerMap.get(partitionId);
- if (LOG.isInfoEnabled()) {
- LOG.info("prepareSuperstepRestart: File " + metadataPath +
- " with position " + dataPos +
- ", partition id = " + partitionId +
- " assigned to " + partitionOwner);
- }
- partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix);
- }
- metadataStream.close();
- }
- }
-
- @Override
- public void setup() {
- // Might have to manually load a checkpoint.
- // In that case, the input splits are not set, they will be faked by
- // the checkpoint files. Each checkpoint file will be an input split
- // and the input split
-
- if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
- GiraphStats.getInstance().getSuperstepCounter().
- setValue(getRestartedSuperstep());
- }
- for (MasterObserver observer : observers) {
- observer.preApplication();
- getContext().progress();
- }
- }
-
- @Override
- public boolean becomeMaster() {
- // Create my bid to become the master, then try to become the worker
- // or return false.
- String myBid = null;
- try {
- myBid =
- getZkExt().createExt(masterElectionPath +
- "/" + getHostnamePartitionId(),
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "becomeMaster: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "becomeMaster: IllegalStateException", e);
- }
- while (true) {
- JSONObject jobState = getJobState();
- try {
- if ((jobState != null) &&
- ApplicationState.valueOf(
- jobState.getString(JSONOBJ_STATE_KEY)) ==
- ApplicationState.FINISHED) {
- LOG.info("becomeMaster: Job is finished, " +
- "give up trying to be the master!");
- isMaster = false;
- return isMaster;
- }
- } catch (JSONException e) {
- throw new IllegalStateException(
- "becomeMaster: Couldn't get state from " + jobState, e);
- }
- try {
- List<String> masterChildArr =
- getZkExt().getChildrenExt(
- masterElectionPath, true, true, true);
- if (LOG.isInfoEnabled()) {
- LOG.info("becomeMaster: First child is '" +
- masterChildArr.get(0) + "' and my bid is '" +
- myBid + "'");
- }
- if (masterChildArr.get(0).equals(myBid)) {
- GiraphStats.getInstance().getCurrentMasterTaskPartition().
- setValue(getTaskPartition());
- masterCompute = getConfiguration().createMasterCompute();
- aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
- getContext());
- aggregatorHandler.initialize(this);
-
- masterInfo = new MasterInfo();
- masterServer =
- new NettyMasterServer(getConfiguration(), this, getContext());
- masterInfo.setInetSocketAddress(masterServer.getMyAddress());
- masterInfo.setTaskId(getTaskPartition());
- masterClient =
- new NettyMasterClient(getContext(), getConfiguration(), this);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("becomeMaster: I am now the master!");
- }
- isMaster = true;
- return isMaster;
- }
- LOG.info("becomeMaster: Waiting to become the master...");
- getMasterElectionChildrenChangedEvent().waitForever();
- getMasterElectionChildrenChangedEvent().reset();
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "becomeMaster: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "becomeMaster: IllegalStateException", e);
- }
- }
- }
-
- @Override
- public MasterInfo getMasterInfo() {
- return masterInfo;
- }
-
- /**
- * Collect and aggregate the worker statistics for a particular superstep.
- *
- * @param superstep Superstep to aggregate on
- * @return Global statistics aggregated on all worker statistics
- */
- private GlobalStats aggregateWorkerStats(long superstep) {
- ImmutableClassesGiraphConfiguration conf = getConfiguration();
-
- Class<? extends PartitionStats> partitionStatsClass =
- masterGraphPartitioner.createPartitionStats().getClass();
- GlobalStats globalStats = new GlobalStats();
- // Get the stats from the all the worker selected nodes
- String workerFinishedPath =
- getWorkerFinishedPath(getApplicationAttempt(), superstep);
- List<String> workerFinishedPathList = null;
- try {
- workerFinishedPathList =
- getZkExt().getChildrenExt(
- workerFinishedPath, false, false, true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: InterruptedException", e);
- }
-
- AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
-
- allPartitionStatsList.clear();
- for (String finishedPath : workerFinishedPathList) {
- String hostnamePartitionId = FilenameUtils.getName(finishedPath);
- JSONObject workerFinishedInfoObj = null;
- try {
- byte [] zkData =
- getZkExt().getData(finishedPath, false, null);
- workerFinishedInfoObj = new JSONObject(new String(zkData));
- List<PartitionStats> statsList =
- WritableUtils.readListFieldsFromByteArray(
- Base64.decode(workerFinishedInfoObj.getString(
- JSONOBJ_PARTITION_STATS_KEY)),
- partitionStatsClass,
- conf);
- for (PartitionStats partitionStats : statsList) {
- globalStats.addPartitionStats(partitionStats);
- allPartitionStatsList.add(partitionStats);
- }
- globalStats.addMessageCount(
- workerFinishedInfoObj.getLong(
- JSONOBJ_NUM_MESSAGES_KEY));
- if (conf.metricsEnabled() &&
- workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) {
- WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics();
- WritableUtils.readFieldsFromByteArray(
- Base64.decode(
- workerFinishedInfoObj.getString(
- JSONOBJ_METRICS_KEY)),
- workerMetrics);
- aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
- }
- } catch (JSONException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: JSONException", e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: InterruptedException", e);
- } catch (IOException e) {
- throw new IllegalStateException(
- "aggregateWorkerStats: IOException", e);
- }
- }
-
- if (conf.metricsEnabled()) {
- aggregatedMetrics.print(superstep);
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
- " on superstep = " + getSuperstep());
- }
- return globalStats;
- }
-
- /**
- * Finalize the checkpoint file prefixes by taking the chosen workers and
- * writing them to a finalized file. Also write out the master
- * aggregated aggregator array from the previous superstep.
- *
- * @param superstep superstep to finalize
- * @param chosenWorkerInfoList list of chosen workers that will be finalized
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
- */
- private void finalizeCheckpoint(long superstep,
- List<WorkerInfo> chosenWorkerInfoList)
- throws IOException, KeeperException, InterruptedException {
- Path finalizedCheckpointPath =
- new Path(getCheckpointBasePath(superstep) +
- CHECKPOINT_FINALIZED_POSTFIX);
- try {
- getFs().delete(finalizedCheckpointPath, false);
- } catch (IOException e) {
- LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " +
- finalizedCheckpointPath);
- }
-
- // Format:
- // <global statistics>
- // <number of files>
- // <used file prefix 0><used file prefix 1>...
- // <aggregator data>
- // <masterCompute data>
- FSDataOutputStream finalizedOutputStream =
- getFs().create(finalizedCheckpointPath);
-
- String superstepFinishedNode =
- getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
- finalizedOutputStream.write(
- getZkExt().getData(superstepFinishedNode, false, null));
-
- finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
- for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
- String chosenWorkerInfoPrefix =
- getCheckpointBasePath(superstep) + "." +
- chosenWorkerInfo.getHostnameId();
- finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix);
- }
- aggregatorHandler.write(finalizedOutputStream);
- masterCompute.write(finalizedOutputStream);
- finalizedOutputStream.close();
- lastCheckpointedSuperstep = superstep;
- GiraphStats.getInstance().
- getLastCheckpointedSuperstep().setValue(superstep);
- }
-
- /**
- * Assign the partitions for this superstep. If there are changes,
- * the workers will know how to do the exchange. If this was a restarted
- * superstep, then make sure to provide information on where to find the
- * checkpoint file.
- *
- * @param allPartitionStatsList All partition stats
- * @param chosenWorkerInfoList All the chosen worker infos
- * @param masterGraphPartitioner Master graph partitioner
- */
- private void assignPartitionOwners(
- List<PartitionStats> allPartitionStatsList,
- List<WorkerInfo> chosenWorkerInfoList,
- MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner) {
- Collection<PartitionOwner> partitionOwners;
- if (getSuperstep() == INPUT_SUPERSTEP ||
- getSuperstep() == getRestartedSuperstep()) {
- partitionOwners =
- masterGraphPartitioner.createInitialPartitionOwners(
- chosenWorkerInfoList, maxWorkers);
- if (partitionOwners.isEmpty()) {
- throw new IllegalStateException(
- "assignAndExchangePartitions: No partition owners set");
- }
- } else {
- partitionOwners =
- masterGraphPartitioner.generateChangedPartitionOwners(
- allPartitionStatsList,
- chosenWorkerInfoList,
- maxWorkers,
- getSuperstep());
-
- PartitionUtils.analyzePartitionStats(partitionOwners,
- allPartitionStatsList);
- }
- checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners());
-
- // If restarted, prepare the checkpoint restart
- if (getRestartedSuperstep() == getSuperstep()) {
- try {
- prepareCheckpointRestart(getSuperstep(), partitionOwners);
- } catch (IOException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: IOException on preparing", e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: KeeperException on preparing", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: InteruptedException on preparing",
- e);
- }
- }
-
- // There will be some exchange of partitions
- if (!partitionOwners.isEmpty()) {
- String vertexExchangePath =
- getPartitionExchangePath(getApplicationAttempt(),
- getSuperstep());
- try {
- getZkExt().createOnceExt(vertexExchangePath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: KeeperException creating " +
- vertexExchangePath);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "assignPartitionOwners: InterruptedException creating " +
- vertexExchangePath);
- }
- }
-
- // Workers are waiting for these assignments
- AddressesAndPartitionsWritable addressesAndPartitions =
- new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
- partitionOwners);
- String addressesAndPartitionsPath =
- getAddressesAndPartitionsPath(getApplicationAttempt(),
- getSuperstep());
- WritableUtils.writeToZnode(
- getZkExt(),
- addressesAndPartitionsPath,
- -1,
- addressesAndPartitions);
- }
-
- /**
- * Check if partition ids are valid
- *
- * @param partitionOwners List of partition ids for current superstep
- */
- private void checkPartitions(Collection<PartitionOwner> partitionOwners) {
- for (PartitionOwner partitionOwner : partitionOwners) {
- int partitionId = partitionOwner.getPartitionId();
- if (partitionId < 0 || partitionId >= partitionOwners.size()) {
- throw new IllegalStateException("checkPartitions: " +
- "Invalid partition id " + partitionId +
- " - partition ids must be values from 0 to (numPartitions - 1)");
- }
- }
- }
-
- /**
- * Check whether the workers chosen for this superstep are still alive
- *
- * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper
- * @param chosenWorkerInfoList List of the healthy workers
- * @return true if they are all alive, false otherwise.
- * @throws InterruptedException
- * @throws KeeperException
- */
- private boolean superstepChosenWorkerAlive(
- String chosenWorkerInfoHealthPath,
- List<WorkerInfo> chosenWorkerInfoList)
- throws KeeperException, InterruptedException {
- List<WorkerInfo> chosenWorkerInfoHealthyList =
- getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
- Set<WorkerInfo> chosenWorkerInfoHealthySet =
- new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
- boolean allChosenWorkersHealthy = true;
- for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
- if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
- allChosenWorkersHealthy = false;
- LOG.error("superstepChosenWorkerAlive: Missing chosen " +
- "worker " + chosenWorkerInfo +
- " on superstep " + getSuperstep());
- }
- }
- return allChosenWorkersHealthy;
- }
-
- @Override
- public void restartFromCheckpoint(long checkpoint) {
- // Process:
- // 1. Remove all old input split data
- // 2. Increase the application attempt and set to the correct checkpoint
- // 3. Send command to all workers to restart their tasks
- try {
- getZkExt().deleteExt(vertexInputSplitsPaths.getPath(), -1,
- true);
- getZkExt().deleteExt(edgeInputSplitsPaths.getPath(), -1,
- true);
- } catch (InterruptedException e) {
- throw new RuntimeException(
- "restartFromCheckpoint: InterruptedException", e);
- } catch (KeeperException e) {
- throw new RuntimeException(
- "restartFromCheckpoint: KeeperException", e);
- }
- setApplicationAttempt(getApplicationAttempt() + 1);
- setCachedSuperstep(checkpoint);
- setRestartedSuperstep(checkpoint);
- setJobState(ApplicationState.START_SUPERSTEP,
- getApplicationAttempt(),
- checkpoint);
- }
-
- /**
- * Only get the finalized checkpoint files
- */
- public static class FinalizedCheckpointPathFilter implements PathFilter {
- @Override
- public boolean accept(Path path) {
- return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX);
- }
- }
-
- @Override
- public long getLastGoodCheckpoint() throws IOException {
- // Find the last good checkpoint if none have been written to the
- // knowledge of this master
- if (lastCheckpointedSuperstep == -1) {
- try {
- FileStatus[] fileStatusArray =
- getFs().listStatus(new Path(checkpointBasePath),
- new FinalizedCheckpointPathFilter());
- if (fileStatusArray == null) {
- return -1;
- }
- Arrays.sort(fileStatusArray);
- lastCheckpointedSuperstep = getCheckpoint(
- fileStatusArray[fileStatusArray.length - 1].getPath());
- if (LOG.isInfoEnabled()) {
- LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
- lastCheckpointedSuperstep + " from " +
- fileStatusArray[fileStatusArray.length - 1].
- getPath().toString());
- }
- } catch (IOException e) {
- LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
- "found, killing the job.", e);
- failJob();
- }
- }
-
- return lastCheckpointedSuperstep;
- }
-
- /**
- * Wait for a set of workers to signal that they are done with the
- * barrier.
- *
- * @param finishedWorkerPath Path to where the workers will register their
- * hostname and id
- * @param workerInfoList List of the workers to wait for
- * @param event Event to wait on for a chance to be done.
- * @return True if barrier was successful, false if there was a worker
- * failure
- */
- private boolean barrierOnWorkerList(String finishedWorkerPath,
- List<WorkerInfo> workerInfoList,
- BspEvent event) {
- try {
- getZkExt().createOnceExt(finishedWorkerPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "barrierOnWorkerList: KeeperException - Couldn't create " +
- finishedWorkerPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "barrierOnWorkerList: InterruptedException - Couldn't create " +
- finishedWorkerPath, e);
- }
- List<String> hostnameIdList =
- new ArrayList<String>(workerInfoList.size());
- for (WorkerInfo workerInfo : workerInfoList) {
- hostnameIdList.add(workerInfo.getHostnameId());
- }
- String workerInfoHealthyPath =
- getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
- List<String> finishedHostnameIdList;
- long nextInfoMillis = System.currentTimeMillis();
- final int defaultTaskTimeoutMsec = 10 * 60 * 1000; // from TaskTracker
- final int taskTimeoutMsec = getContext().getConfiguration().getInt(
- "mapred.task.timeout", defaultTaskTimeoutMsec);
- while (true) {
- try {
- finishedHostnameIdList =
- getZkExt().getChildrenExt(finishedWorkerPath,
- true,
- false,
- false);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "barrierOnWorkerList: KeeperException - Couldn't get " +
- "children of " + finishedWorkerPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "barrierOnWorkerList: IllegalException - Couldn't get " +
- "children of " + finishedWorkerPath, e);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("barrierOnWorkerList: Got finished worker list = " +
- finishedHostnameIdList + ", size = " +
- finishedHostnameIdList.size() +
- ", worker list = " +
- workerInfoList + ", size = " +
- workerInfoList.size() +
- " from " + finishedWorkerPath);
- }
-
- if (LOG.isInfoEnabled() &&
- (System.currentTimeMillis() > nextInfoMillis)) {
- nextInfoMillis = System.currentTimeMillis() + 30000;
- LOG.info("barrierOnWorkerList: " +
- finishedHostnameIdList.size() +
- " out of " + workerInfoList.size() +
- " workers finished on superstep " +
- getSuperstep() + " on path " + finishedWorkerPath);
- if (workerInfoList.size() - finishedHostnameIdList.size() <
- MAX_PRINTABLE_REMAINING_WORKERS) {
- Set<String> remainingWorkers = Sets.newHashSet(hostnameIdList);
- remainingWorkers.removeAll(finishedHostnameIdList);
- LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
- }
- }
- getContext().setStatus(getGraphMapper().getMapFunctions() + " - " +
- finishedHostnameIdList.size() +
- " finished out of " +
- workerInfoList.size() +
- " on superstep " + getSuperstep());
- if (finishedHostnameIdList.containsAll(hostnameIdList)) {
- break;
- }
-
- // Wait for a signal or timeout
- event.waitMsecs(taskTimeoutMsec / 2);
- event.reset();
- getContext().progress();
-
- // Did a worker die?
- try {
- if ((getSuperstep() > 0) &&
- !superstepChosenWorkerAlive(
- workerInfoHealthyPath,
- workerInfoList)) {
- return false;
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "barrierOnWorkerList: KeeperException - " +
- "Couldn't get " + workerInfoHealthyPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "barrierOnWorkerList: InterruptedException - " +
- "Couldn't get " + workerInfoHealthyPath, e);
- }
- }
-
- return true;
- }
-
- /**
- * Clean up old superstep data from Zookeeper
- *
- * @param removeableSuperstep Supersteo to clean up
- * @throws InterruptedException
- */
- private void cleanUpOldSuperstep(long removeableSuperstep) throws
- InterruptedException {
- if (!(getConfiguration().getBoolean(
- GiraphConstants.KEEP_ZOOKEEPER_DATA,
- GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
- (removeableSuperstep >= 0)) {
- String oldSuperstepPath =
- getSuperstepPath(getApplicationAttempt()) + "/" +
- removeableSuperstep;
- try {
- if (LOG.isInfoEnabled()) {
- LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
- oldSuperstepPath);
- }
- getZkExt().deleteExt(oldSuperstepPath,
- -1,
- true);
- } catch (KeeperException.NoNodeException e) {
- LOG.warn("coordinateBarrier: Already cleaned up " +
- oldSuperstepPath);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "coordinateSuperstep: KeeperException on " +
- "finalizing checkpoint", e);
- }
- }
- }
-
- /**
- * Coordinate the exchange of vertex/edge input splits among workers.
- *
- * @param inputSplitPaths Input split paths
- * @param inputSplitEvents Input split events
- * @param inputSplitsType Type of input splits (for logging purposes)
- */
- private void coordinateInputSplits(InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents,
- String inputSplitsType) {
- // Coordinate the workers finishing sending their vertices/edges to the
- // correct workers and signal when everything is done.
- String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
- if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
- chosenWorkerInfoList,
- inputSplitEvents.getDoneStateChanged())) {
- throw new IllegalStateException(logPrefix + ": Worker failed during " +
- "input split (currently not supported)");
- }
- try {
- getZkExt().createExt(inputSplitPaths.getAllDonePath(),
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- false);
- } catch (KeeperException.NodeExistsException e) {
- LOG.info("coordinateInputSplits: Node " +
- inputSplitPaths.getAllDonePath() + " already exists.");
- } catch (KeeperException e) {
- throw new IllegalStateException(logPrefix + ": KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
- }
- }
-
- @Override
- public SuperstepState coordinateSuperstep() throws
- KeeperException, InterruptedException {
- // 1. Get chosen workers and set up watches on them.
- // 2. Assign partitions to the workers
- // (possibly reloading from a superstep)
- // 3. Wait for all workers to complete
- // 4. Collect and process aggregators
- // 5. Create superstep finished node
- // 6. If the checkpoint frequency is met, finalize the checkpoint
-
- for (MasterObserver observer : observers) {
- observer.preSuperstep();
- getContext().progress();
- }
-
- chosenWorkerInfoList = checkWorkers();
- if (chosenWorkerInfoList == null) {
- LOG.fatal("coordinateSuperstep: Not enough healthy workers for " +
- "superstep " + getSuperstep());
- setJobState(ApplicationState.FAILED, -1, -1);
- } else {
- for (WorkerInfo workerInfo : chosenWorkerInfoList) {
- String workerInfoHealthyPath =
- getWorkerInfoHealthyPath(getApplicationAttempt(),
- getSuperstep()) + "/" +
- workerInfo.getHostnameId();
- if (getZkExt().exists(workerInfoHealthyPath, true) == null) {
- LOG.warn("coordinateSuperstep: Chosen worker " +
- workerInfoHealthyPath +
- " is no longer valid, failing superstep");
- }
- }
- }
-
- masterClient.openConnections();
-
- GiraphStats.getInstance().
- getCurrentWorkers().setValue(chosenWorkerInfoList.size());
- assignPartitionOwners(allPartitionStatsList,
- chosenWorkerInfoList,
- masterGraphPartitioner);
-
- // We need to finalize aggregators from previous superstep (send them to
- // worker owners) after new worker assignments
- if (getSuperstep() >= 0) {
- aggregatorHandler.finishSuperstep(masterClient);
- }
-
- // Finalize the valid checkpoint file prefixes and possibly
- // the aggregators.
- if (checkpointFrequencyMet(getSuperstep())) {
- String workerWroteCheckpointPath =
- getWorkerWroteCheckpointPath(getApplicationAttempt(),
- getSuperstep());
- // first wait for all the workers to write their checkpoint data
- if (!barrierOnWorkerList(workerWroteCheckpointPath,
- chosenWorkerInfoList,
- getWorkerWroteCheckpointEvent())) {
- return SuperstepState.WORKER_FAILURE;
- }
- try {
- finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
- } catch (IOException e) {
- throw new IllegalStateException(
- "coordinateSuperstep: IOException on finalizing checkpoint",
- e);
- }
- }
-
- if (getSuperstep() == INPUT_SUPERSTEP) {
- if (getConfiguration().hasVertexInputFormat()) {
- coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
- "Vertex");
- }
- if (getConfiguration().hasEdgeInputFormat()) {
- coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents,
- "Edge");
- }
- }
-
- String finishedWorkerPath =
- getWorkerFinishedPath(getApplicationAttempt(), getSuperstep());
- if (!barrierOnWorkerList(finishedWorkerPath,
- chosenWorkerInfoList,
- getSuperstepStateChangedEvent())) {
- return SuperstepState.WORKER_FAILURE;
- }
-
- // Collect aggregator values, then run the master.compute() and
- // finally save the aggregator values
- aggregatorHandler.prepareSuperstep(masterClient);
- runMasterCompute(getSuperstep());
-
- // If the master is halted or all the vertices voted to halt and there
- // are no more messages in the system, stop the computation
- GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
- if (masterCompute.isHalted() ||
- (globalStats.getFinishedVertexCount() ==
- globalStats.getVertexCount() &&
- globalStats.getMessageCount() == 0)) {
- globalStats.setHaltComputation(true);
- }
-
- // Let everyone know the aggregated application state through the
- // superstep finishing znode.
- String superstepFinishedNode =
- getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
- WritableUtils.writeToZnode(
- getZkExt(), superstepFinishedNode, -1, globalStats);
- updateCounters(globalStats);
-
- cleanUpOldSuperstep(getSuperstep() - 1);
- incrCachedSuperstep();
- // Counter starts at zero, so no need to increment
- if (getSuperstep() > 0) {
- GiraphStats.getInstance().getSuperstepCounter().increment();
- }
- SuperstepState superstepState;
- if (globalStats.getHaltComputation()) {
- superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
- } else {
- superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
- }
- aggregatorHandler.writeAggregators(getSuperstep(), superstepState);
-
- return superstepState;
- }
-
- /**
- * Run the master.compute() class
- *
- * @param superstep superstep for which to run the master.compute()
- */
- private void runMasterCompute(long superstep) {
- // The master.compute() should run logically before the workers, so
- // increase the superstep counter it uses by one
- GraphState<I, V, E, M> graphState =
- new GraphState<I, V, E, M>(superstep + 1,
- GiraphStats.getInstance().getVertices().getValue(),
- GiraphStats.getInstance().getEdges().getValue(),
- getContext(), getGraphMapper(), null, null);
- masterCompute.setGraphState(graphState);
- if (superstep == INPUT_SUPERSTEP) {
- try {
- masterCompute.initialize();
- } catch (InstantiationException e) {
- LOG.fatal("runMasterCompute: Failed in instantiation", e);
- throw new RuntimeException(
- "runMasterCompute: Failed in instantiation", e);
- } catch (IllegalAccessException e) {
- LOG.fatal("runMasterCompute: Failed in access", e);
- throw new RuntimeException(
- "runMasterCompute: Failed in access", e);
- }
- }
- GiraphTimerContext timerContext = masterComputeTimer.time();
- masterCompute.compute();
- timerContext.stop();
- }
-
- /**
- * Need to clean up ZooKeeper nicely. Make sure all the masters and workers
- * have reported ending their ZooKeeper connections.
- */
- private void cleanUpZooKeeper() {
- try {
- getZkExt().createExt(cleanedUpPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException.NodeExistsException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath +
- " already exists, no need to create.");
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "cleanupZooKeeper: Got KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "cleanupZooKeeper: Got IllegalStateException", e);
- }
- // Need to wait for the number of workers and masters to complete
- int maxTasks = BspInputFormat.getMaxTasks(getConfiguration());
- if ((getGraphMapper().getMapFunctions() == MapFunctions.ALL) ||
- (getGraphMapper().getMapFunctions() ==
- MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
- maxTasks *= 2;
- }
- List<String> cleanedUpChildrenList = null;
- while (true) {
- try {
- cleanedUpChildrenList =
- getZkExt().getChildrenExt(
- cleanedUpPath, true, false, true);
- if (LOG.isInfoEnabled()) {
- LOG.info("cleanUpZooKeeper: Got " +
- cleanedUpChildrenList.size() + " of " +
- maxTasks + " desired children from " +
- cleanedUpPath);
- }
- if (cleanedUpChildrenList.size() == maxTasks) {
- break;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("cleanedUpZooKeeper: Waiting for the " +
- "children of " + cleanedUpPath +
- " to change since only got " +
- cleanedUpChildrenList.size() + " nodes.");
- }
- } catch (KeeperException e) {
- // We are in the cleanup phase -- just log the error
- LOG.error("cleanUpZooKeeper: Got KeeperException, " +
- "but will continue", e);
- return;
- } catch (InterruptedException e) {
- // We are in the cleanup phase -- just log the error
- LOG.error("cleanUpZooKeeper: Got InterruptedException, " +
- "but will continue", e);
- return;
- }
-
- getCleanedUpChildrenChangedEvent().waitForever();
- getCleanedUpChildrenChangedEvent().reset();
- }
-
- // At this point, all processes have acknowledged the cleanup,
- // and the master can do any final cleanup if the ZooKeeper service was
- // provided (not dynamically started) and we don't want to keep the data
- try {
- if (getConfiguration().getZookeeperList() != null &&
- !getConfiguration().getBoolean(
- GiraphConstants.KEEP_ZOOKEEPER_DATA,
- GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("cleanupZooKeeper: Removing the following path " +
- "and all children - " + basePath + " from ZooKeeper list " +
- getConfiguration().getZookeeperList());
- }
- getZkExt().deleteExt(basePath, -1, true);
- }
- } catch (KeeperException e) {
- LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
- basePath + " due to KeeperException", e);
- } catch (InterruptedException e) {
- LOG.error("cleanupZooKeeper: Failed to do cleanup of " +
- basePath + " due to InterruptedException", e);
- }
- }
-
- @Override
- public void postApplication() {
- for (MasterObserver observer : observers) {
- observer.postApplication();
- getContext().progress();
- }
- }
-
- @Override
- public void postSuperstep() {
- for (MasterObserver observer : observers) {
- observer.postSuperstep();
- getContext().progress();
- }
- }
-
- @Override
- public void failureCleanup(Exception e) {
- for (MasterObserver observer : observers) {
- try {
- observer.applicationFailed(e);
- // CHECKSTYLE: stop IllegalCatchCheck
- } catch (RuntimeException re) {
- // CHECKSTYLE: resume IllegalCatchCheck
- LOG.error(re.getClass().getName() + " from observer " +
- observer.getClass().getName(), re);
- }
- getContext().progress();
- }
- }
-
- @Override
- public void cleanup() throws IOException {
- // All master processes should denote they are done by adding special
- // znode. Once the number of znodes equals the number of partitions
- // for workers and masters, the master will clean up the ZooKeeper
- // znodes associated with this job.
- String masterCleanedUpPath = cleanedUpPath + "/" +
- getTaskPartition() + MASTER_SUFFIX;
- try {
- String finalFinishedPath =
- getZkExt().createExt(masterCleanedUpPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- if (LOG.isInfoEnabled()) {
- LOG.info("cleanup: Notifying master its okay to cleanup with " +
- finalFinishedPath);
- }
- } catch (KeeperException.NodeExistsException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("cleanup: Couldn't create finished node '" +
- masterCleanedUpPath);
- }
- } catch (KeeperException e) {
- LOG.error("cleanup: Got KeeperException, continuing", e);
- } catch (InterruptedException e) {
- LOG.error("cleanup: Got InterruptedException, continuing", e);
- }
-
- if (isMaster) {
- cleanUpZooKeeper();
- // If desired, cleanup the checkpoint directory
- if (getConfiguration().getBoolean(
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS,
- GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) {
- boolean success =
- getFs().delete(new Path(checkpointBasePath), true);
- if (LOG.isInfoEnabled()) {
- LOG.info("cleanup: Removed HDFS checkpoint directory (" +
- checkpointBasePath + ") with return = " +
- success + " since the job " + getContext().getJobName() +
- " succeeded ");
- }
- }
- aggregatorHandler.close();
-
- masterClient.closeConnections();
- masterServer.close();
- }
-
- try {
- getZkExt().close();
- } catch (InterruptedException e) {
- // cleanup phase -- just log the error
- LOG.error("cleanup: Zookeeper failed to close", e);
- }
- }
-
- /**
- * Event that the master watches that denotes when a worker wrote checkpoint
- *
- * @return Event that denotes when a worker wrote checkpoint
- */
- public final BspEvent getWorkerWroteCheckpointEvent() {
- return workerWroteCheckpoint;
- }
-
- /**
- * Event that the master watches that denotes if a worker has done something
- * that changes the state of a superstep (either a worker completed or died)
- *
- * @return Event that denotes a superstep state change
- */
- public final BspEvent getSuperstepStateChangedEvent() {
- return superstepStateChanged;
- }
-
- /**
- * Should this worker failure cause the current superstep to fail?
- *
- * @param failedWorkerPath Full path to the failed worker
- */
- private void checkHealthyWorkerFailure(String failedWorkerPath) {
- if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
- return;
- }
-
- Collection<PartitionOwner> partitionOwners =
- masterGraphPartitioner.getCurrentPartitionOwners();
- String hostnameId =
- getHealthyHostnameIdFromPath(failedWorkerPath);
- for (PartitionOwner partitionOwner : partitionOwners) {
- WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
- WorkerInfo previousWorkerInfo =
- partitionOwner.getPreviousWorkerInfo();
- if (workerInfo.getHostnameId().equals(hostnameId) ||
- ((previousWorkerInfo != null) &&
- previousWorkerInfo.getHostnameId().equals(hostnameId))) {
- LOG.warn("checkHealthyWorkerFailure: " +
- "at least one healthy worker went down " +
- "for superstep " + getSuperstep() + " - " +
- hostnameId + ", will try to restart from " +
- "checkpointed superstep " +
- lastCheckpointedSuperstep);
- superstepStateChanged.signal();
- }
- }
- }
-
- @Override
- public boolean processEvent(WatchedEvent event) {
- boolean foundEvent = false;
- if (event.getPath().contains(WORKER_HEALTHY_DIR) &&
- (event.getType() == EventType.NodeDeleted)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("processEvent: Healthy worker died (node deleted) " +
- "in " + event.getPath());
- }
- checkHealthyWorkerFailure(event.getPath());
- superstepStateChanged.signal();
- foundEvent = true;
- } else if (event.getPath().contains(WORKER_FINISHED_DIR) &&
- event.getType() == EventType.NodeChildrenChanged) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("processEvent: Worker finished (node change) " +
- "event - superstepStateChanged signaled");
- }
- superstepStateChanged.signal();
- foundEvent = true;
- } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
- event.getType() == EventType.NodeChildrenChanged) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
- "event - workerWroteCheckpoint signaled");
- }
- workerWroteCheckpoint.signal();
- foundEvent = true;
- }
-
- return foundEvent;
- }
-
- /**
- * Set values of counters to match the ones from {@link GlobalStats}
- *
- * @param globalStats Global statistics which holds new counter values
- */
- private void updateCounters(GlobalStats globalStats) {
- GiraphStats gs = GiraphStats.getInstance();
- gs.getVertices().setValue(globalStats.getVertexCount());
- gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
- gs.getEdges().setValue(globalStats.getEdgeCount());
- gs.getSentMessages().setValue(globalStats.getMessageCount());
- }
-
- /**
- * Task that writes a given input split to zookeeper.
- * Upon failure call() throws an exception.
- */
- private class WriteInputSplit implements Callable<Void> {
- /** Input split which we are going to write */
- private final InputSplit inputSplit;
- /** Input splits path */
- private final String inputSplitsPath;
- /** Index of the input split */
- private final int index;
-
- /**
- * Constructor
- *
- * @param inputSplit Input split which we are going to write
- * @param inputSplitsPath Input splits path
- * @param index Index of the input split
- */
- public WriteInputSplit(InputSplit inputSplit,
- String inputSplitsPath,
- int index) {
- this.inputSplit = inputSplit;
- this.inputSplitsPath = inputSplitsPath;
- this.index = index;
- }
-
- @Override
- public Void call() {
- String inputSplitPath = null;
- try {
- ByteArrayOutputStream byteArrayOutputStream =
- new ByteArrayOutputStream();
- DataOutput outputStream =
- new DataOutputStream(byteArrayOutputStream);
-
- String[] splitLocations = inputSplit.getLocations();
- StringBuilder locations = null;
- if (splitLocations != null) {
- int splitListLength =
- Math.min(splitLocations.length, localityLimit);
- locations = new StringBuilder();
- for (String location : splitLocations) {
- locations.append(location)
- .append(--splitListLength > 0 ? "\t" : "");
- }
- }
- Text.writeString(outputStream,
- locations == null ? "" : locations.toString());
- Text.writeString(outputStream,
- inputSplit.getClass().getName());
- ((Writable) inputSplit).write(outputStream);
- inputSplitPath = inputSplitsPath + "/" + index;
- getZkExt().createExt(inputSplitPath,
- byteArrayOutputStream.toByteArray(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("call: Created input split " +
- "with index " + index + " serialized as " +
- byteArrayOutputStream.toString());
- }
- } catch (KeeperException.NodeExistsException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("call: Node " +
- inputSplitPath + " already exists.");
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "call: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "call: IllegalStateException", e);
- } catch (IOException e) {
- throw new IllegalStateException(
- "call: IOException", e);
- }
- return null;
- }
- }
-}