You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/08/26 22:15:48 UTC
[1/2] GIRAPH-732
Updated Branches:
refs/heads/trunk fa6b75495 -> ae01f0399
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig
new file mode 100644
index 0000000..9311fbd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java.orig
@@ -0,0 +1,1535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerClient;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerServer;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GlobalStats;
+import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.InputSplitPaths;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.io.superstep_output.SuperstepOutput;
+import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.master.SuperstepClasses;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphTimer;
+import org.apache.giraph.metrics.GiraphTimerContext;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionExchange;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.JMapHistoDumper;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import net.iharder.Base64;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public class BspServiceWorker<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends BspService<I, V, E>
+ implements CentralizedServiceWorker<I, V, E>,
+ ResetSuperstepMetricsObserver {
+ /** Name of gauge for time spent waiting on other workers */
+ public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
+ /** My process health znode */
+ private String myHealthZnode;
+ /** Worker info */
+ private final WorkerInfo workerInfo;
+ /** Worker graph partitioner */
+ private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
+
+ /** IPC Client */
+ private final WorkerClient<I, V, E> workerClient;
+ /** IPC Server */
+ private final WorkerServer<I, V, E> workerServer;
+ /** Request processor for aggregator requests */
+ private final WorkerAggregatorRequestProcessor
+ workerAggregatorRequestProcessor;
+ /** Master info */
+ private MasterInfo masterInfo = new MasterInfo();
+ /** List of workers */
+ private List<WorkerInfo> workerInfoList = Lists.newArrayList();
+ /** Have the partition exchange children (workers) changed? */
+ private final BspEvent partitionExchangeChildrenChanged;
+
+ /** Worker Context */
+ private final WorkerContext workerContext;
+
+ /** Handler for aggregators */
+ private final WorkerAggregatorHandler aggregatorHandler;
+
+ /** Superstep output */
+ private SuperstepOutput<I, V, E> superstepOutput;
+
+ /** array of observers to call back to */
+ private final WorkerObserver[] observers;
+
+ // Per-Superstep Metrics
+ /** Timer for WorkerContext#postSuperstep */
+ private GiraphTimer wcPostSuperstepTimer;
+ /** Time spent waiting on requests to finish */
+ private GiraphTimer waitRequestsTimer;
+
+ /**
+ * Constructor for setting up the worker.
+ *
+ * @param serverPortList ZooKeeper server port list
+ * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
+ * @param context Mapper context
+ * @param graphTaskManager GraphTaskManager for this compute node
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public BspServiceWorker(
+ String serverPortList,
+ int sessionMsecTimeout,
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphTaskManager<I, V, E> graphTaskManager)
+ throws IOException, InterruptedException {
+ super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
+ ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
+ partitionExchangeChildrenChanged = new PredicateLock(context);
+ registerBspEvent(partitionExchangeChildrenChanged);
+ workerGraphPartitioner =
+ getGraphPartitionerFactory().createWorkerGraphPartitioner();
+ workerInfo = new WorkerInfo();
+ workerServer = new NettyWorkerServer<I, V, E>(conf, this, context);
+ workerInfo.setInetSocketAddress(workerServer.getMyAddress());
+ workerInfo.setTaskId(getTaskPartition());
+ workerClient = new NettyWorkerClient<I, V, E>(context, conf, this);
+
+ workerAggregatorRequestProcessor =
+ new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
+
+ aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
+
+ workerContext = conf.createWorkerContext();
+ workerContext.setWorkerAggregatorUsage(aggregatorHandler);
+
+ superstepOutput = conf.createSuperstepOutput(context);
+
+ if (conf.isJMapHistogramDumpEnabled()) {
+ conf.addWorkerObserverClass(JMapHistoDumper.class);
+ }
+ observers = conf.createWorkerObservers();
+
+ GiraphMetrics.get().addSuperstepResetObserver(this);
+ }
+
+ @Override
+ public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+ waitRequestsTimer = new GiraphTimer(superstepMetrics,
+ TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
+ wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
+ "worker-context-post-superstep", TimeUnit.MICROSECONDS);
+ }
+
+ @Override
+ public WorkerContext getWorkerContext() {
+ return workerContext;
+ }
+
+ @Override
+ public WorkerObserver[] getWorkerObservers() {
+ return observers;
+ }
+
+ @Override
+ public WorkerClient<I, V, E> getWorkerClient() {
+ return workerClient;
+ }
+
+ /**
+ * Intended to check the health of the node. For instance, can it ssh,
+ * dmesg, etc. For now, does nothing.
+ * TODO: Make this check configurable by the user (i.e. search dmesg for
+ * problems).
+ *
+ * @return True if healthy (always in this case).
+ */
+ public boolean isHealthy() {
+ return true;
+ }
+
+ /**
+ * Load the vertices/edges from input slits. Do this until all the
+ * InputSplits have been processed.
+ * All workers will try to do as many InputSplits as they can. The master
+ * will monitor progress and stop this once all the InputSplits have been
+ * loaded and check-pointed. Keep track of the last input split path to
+ * ensure the input split cache is flushed prior to marking the last input
+ * split complete.
+ *
+ * Use one or more threads to do the loading.
+ *
+ * @param inputSplitPathList List of input split paths
+ * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
+ * @return Statistics of the vertices and edges loaded
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ private VertexEdgeCount loadInputSplits(
+ List<String> inputSplitPathList,
+ CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
+ throws KeeperException, InterruptedException {
+ VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+ // Determine how many threads to use based on the number of input splits
+ int maxInputSplitThreads = (inputSplitPathList.size() - 1) /
+ getConfiguration().getMaxWorkers() + 1;
+ int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+ maxInputSplitThreads);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
+ "originally " + getConfiguration().getNumInputSplitsThreads() +
+ " threads(s) for " + inputSplitPathList.size() + " total splits.");
+ }
+
+ List<VertexEdgeCount> results =
+ ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
+ numThreads, "load-%d", getContext());
+ for (VertexEdgeCount result : results) {
+ vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
+ }
+
+ workerClient.waitAllRequests();
+ return vertexEdgeCount;
+ }
+
+
+ /**
+ * Load the vertices from the user-defined
+ * {@link org.apache.giraph.io.VertexReader}
+ *
+ * @return Count of vertices and edges loaded
+ */
+ private VertexEdgeCount loadVertices() throws KeeperException,
+ InterruptedException {
+ List<String> inputSplitPathList =
+ getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
+ false, false, true);
+
+ InputSplitPathOrganizer splitOrganizer =
+ new InputSplitPathOrganizer(getZkExt(),
+ inputSplitPathList, getWorkerInfo().getHostname(),
+ getConfiguration().useInputSplitLocality());
+ InputSplitsHandler splitsHandler = new InputSplitsHandler(
+ splitOrganizer,
+ getZkExt(),
+ getContext(),
+ BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
+ BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
+
+ VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
+ new VertexInputSplitsCallableFactory<I, V, E>(
+ getConfiguration().createWrappedVertexInputFormat(),
+ getContext(),
+ getConfiguration(),
+ this,
+ splitsHandler,
+ getZkExt());
+
+ return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+ }
+
+ /**
+ * Load the edges from the user-defined
+ * {@link org.apache.giraph.io.EdgeReader}.
+ *
+ * @return Number of edges loaded
+ */
+ private long loadEdges() throws KeeperException, InterruptedException {
+ List<String> inputSplitPathList =
+ getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
+ false, false, true);
+
+ InputSplitPathOrganizer splitOrganizer =
+ new InputSplitPathOrganizer(getZkExt(),
+ inputSplitPathList, getWorkerInfo().getHostname(),
+ getConfiguration().useInputSplitLocality());
+ InputSplitsHandler splitsHandler = new InputSplitsHandler(
+ splitOrganizer,
+ getZkExt(),
+ getContext(),
+ BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
+ BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
+
+ EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
+ new EdgeInputSplitsCallableFactory<I, V, E>(
+ getConfiguration().createWrappedEdgeInputFormat(),
+ getContext(),
+ getConfiguration(),
+ this,
+ splitsHandler,
+ getZkExt());
+
+ return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
+ getEdgeCount();
+ }
+
+ @Override
+ public MasterInfo getMasterInfo() {
+ return masterInfo;
+ }
+
+ @Override
+ public List<WorkerInfo> getWorkerInfoList() {
+ return workerInfoList;
+ }
+
+ /**
+ * Ensure the input splits are ready for processing
+ *
+ * @param inputSplitPaths Input split paths
+ * @param inputSplitEvents Input split events
+ */
+ private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
+ InputSplitEvents inputSplitEvents) {
+ while (true) {
+ Stat inputSplitsReadyStat;
+ try {
+ inputSplitsReadyStat = getZkExt().exists(
+ inputSplitPaths.getAllReadyPath(), true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("ensureInputSplitsReady: " +
+ "KeeperException waiting on input splits", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("ensureInputSplitsReady: " +
+ "InterruptedException waiting on input splits", e);
+ }
+ if (inputSplitsReadyStat != null) {
+ break;
+ }
+ inputSplitEvents.getAllReadyChanged().waitForever();
+ inputSplitEvents.getAllReadyChanged().reset();
+ }
+ }
+
+ /**
+ * Wait for all workers to finish processing input splits.
+ *
+ * @param inputSplitPaths Input split paths
+ * @param inputSplitEvents Input split events
+ */
+ private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
+ InputSplitEvents inputSplitEvents) {
+ String workerInputSplitsDonePath =
+ inputSplitPaths.getDonePath() + "/" +
+ getWorkerInfo().getHostnameId();
+ try {
+ getZkExt().createExt(workerInputSplitsDonePath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "KeeperException creating worker done splits", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "InterruptedException creating worker done splits", e);
+ }
+ while (true) {
+ Stat inputSplitsDoneStat;
+ try {
+ inputSplitsDoneStat =
+ getZkExt().exists(inputSplitPaths.getAllDonePath(),
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "KeeperException waiting on worker done splits", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "InterruptedException waiting on worker done splits", e);
+ }
+ if (inputSplitsDoneStat != null) {
+ break;
+ }
+ inputSplitEvents.getAllDoneChanged().waitForever();
+ inputSplitEvents.getAllDoneChanged().reset();
+ }
+ }
+
+ @Override
+ public FinishedSuperstepStats setup() {
+ // Unless doing a restart, prepare for computation:
+ // 1. Start superstep INPUT_SUPERSTEP (no computation)
+ // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
+ // 3. Process input splits until there are no more.
+ // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
+ // 5. Process any mutations deriving from add edge requests
+ // 6. Wait for superstep INPUT_SUPERSTEP to complete.
+ if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+ setCachedSuperstep(getRestartedSuperstep());
+ return new FinishedSuperstepStats(0, false, 0, 0, true);
+ }
+
+ JSONObject jobState = getJobState();
+ if (jobState != null) {
+ try {
+ if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
+ ApplicationState.START_SUPERSTEP) &&
+ jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
+ getSuperstep()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Restarting from an automated " +
+ "checkpointed superstep " +
+ getSuperstep() + ", attempt " +
+ getApplicationAttempt());
+ }
+ setRestartedSuperstep(getSuperstep());
+ return new FinishedSuperstepStats(0, false, 0, 0, true);
+ }
+ } catch (JSONException e) {
+ throw new RuntimeException(
+ "setup: Failed to get key-values from " +
+ jobState.toString(), e);
+ }
+ }
+
+ // Add the partitions that this worker owns
+ Collection<? extends PartitionOwner> masterSetPartitionOwners =
+ startSuperstep();
+ workerGraphPartitioner.updatePartitionOwners(
+ getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+
+ /*if[HADOOP_NON_SECURE]
+ workerClient.setup();
+ else[HADOOP_NON_SECURE]*/
+ workerClient.setup(getConfiguration().authenticate());
+ /*end[HADOOP_NON_SECURE]*/
+
+ // Initialize aggregator at worker side during setup.
+ // Do this just before vertex and edge loading.
+ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+
+ VertexEdgeCount vertexEdgeCount;
+
+ if (getConfiguration().hasVertexInputFormat()) {
+ // Ensure the vertex InputSplits are ready for processing
+ ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
+ getContext().progress();
+ try {
+ vertexEdgeCount = loadVertices();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setup: loadVertices failed with InterruptedException", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setup: loadVertices failed with KeeperException", e);
+ }
+ getContext().progress();
+ } else {
+ vertexEdgeCount = new VertexEdgeCount();
+ }
+
+ if (getConfiguration().hasEdgeInputFormat()) {
+ // Ensure the edge InputSplits are ready for processing
+ ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
+ getContext().progress();
+ try {
+ vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setup: loadEdges failed with InterruptedException", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setup: loadEdges failed with KeeperException", e);
+ }
+ getContext().progress();
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
+ }
+
+ if (getConfiguration().hasVertexInputFormat()) {
+ // Workers wait for each other to finish, coordinated by master
+ waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
+ }
+
+ if (getConfiguration().hasEdgeInputFormat()) {
+ // Workers wait for each other to finish, coordinated by master
+ waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
+ }
+
+ // Create remaining partitions owned by this worker.
+ for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+ if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
+ !getPartitionStore().hasPartition(
+ partitionOwner.getPartitionId())) {
+ Partition<I, V, E> partition =
+ getConfiguration().createPartition(
+ partitionOwner.getPartitionId(), getContext());
+ getPartitionStore().addPartition(partition);
+ }
+ }
+
+ if (getConfiguration().hasEdgeInputFormat()) {
+ // Move edges from temporary storage to their source vertices.
+ getServerData().getEdgeStore().moveEdgesToVertices();
+ }
+
+ // Generate the partition stats for the input superstep and process
+ // if necessary
+ List<PartitionStats> partitionStatsList =
+ new ArrayList<PartitionStats>();
+ for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+ Partition<I, V, E> partition =
+ getPartitionStore().getPartition(partitionId);
+ PartitionStats partitionStats =
+ new PartitionStats(partition.getId(),
+ partition.getVertexCount(),
+ 0,
+ partition.getEdgeCount(),
+ 0, 0);
+ partitionStatsList.add(partitionStats);
+ getPartitionStore().putPartition(partition);
+ }
+ workerGraphPartitioner.finalizePartitionStats(
+ partitionStatsList, getPartitionStore());
+
+ return finishSuperstep(partitionStatsList);
+ }
+
+ /**
+ * Register the health of this worker for a given superstep
+ *
+ * @param superstep Superstep to register health on
+ */
+ private void registerHealth(long superstep) {
+ JSONArray hostnamePort = new JSONArray();
+ hostnamePort.put(getHostname());
+
+ hostnamePort.put(workerInfo.getPort());
+
+ String myHealthPath = null;
+ if (isHealthy()) {
+ myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
+ getSuperstep());
+ } else {
+ myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
+ getSuperstep());
+ }
+ myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
+ try {
+ myHealthZnode = getZkExt().createExt(
+ myHealthPath,
+ WritableUtils.writeToByteArray(workerInfo),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("registerHealth: myHealthPath already exists (likely " +
+ "from previous failure): " + myHealthPath +
+ ". Waiting for change in attempts " +
+ "to re-join the application");
+ getApplicationAttemptChangedEvent().waitForever();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("registerHealth: Got application " +
+ "attempt changed event, killing self");
+ }
+ throw new IllegalStateException(
+ "registerHealth: Trying " +
+ "to get the new application attempt by killing self", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Creating " + myHealthPath +
+ " failed with KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Creating " + myHealthPath +
+ " failed with InterruptedException", e);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("registerHealth: Created my health node for attempt=" +
+ getApplicationAttempt() + ", superstep=" +
+ getSuperstep() + " with " + myHealthZnode +
+ " and workerInfo= " + workerInfo);
+ }
+ }
+
+ /**
+ * Do this to help notify the master quicker that this worker has failed.
+ */
+ private void unregisterHealth() {
+ LOG.error("unregisterHealth: Got failure, unregistering health on " +
+ myHealthZnode + " on superstep " + getSuperstep());
+ try {
+ getZkExt().deleteExt(myHealthZnode, -1, false);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "unregisterHealth: InterruptedException - Couldn't delete " +
+ myHealthZnode, e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "unregisterHealth: KeeperException - Couldn't delete " +
+ myHealthZnode, e);
+ }
+ }
+
+ @Override
+ public void failureCleanup() {
+ unregisterHealth();
+ }
+
+ @Override
+ public Collection<? extends PartitionOwner> startSuperstep() {
+ // Algorithm:
+ // 1. Communication service will combine message from previous
+ // superstep
+ // 2. Register my health for the next superstep.
+ // 3. Wait until the partition assignment is complete and get it
+ // 4. Get the aggregator values from the previous superstep
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ workerServer.prepareSuperstep();
+ }
+
+ registerHealth(getSuperstep());
+
+ String addressesAndPartitionsPath =
+ getAddressesAndPartitionsPath(getApplicationAttempt(),
+ getSuperstep());
+ AddressesAndPartitionsWritable addressesAndPartitions =
+ new AddressesAndPartitionsWritable(
+ workerGraphPartitioner.createPartitionOwner().getClass());
+ try {
+ while (getZkExt().exists(addressesAndPartitionsPath, true) ==
+ null) {
+ getAddressesAndPartitionsReadyChangedEvent().waitForever();
+ getAddressesAndPartitionsReadyChangedEvent().reset();
+ }
+ WritableUtils.readFieldsFromZnode(
+ getZkExt(),
+ addressesAndPartitionsPath,
+ false,
+ null,
+ addressesAndPartitions);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "startSuperstep: KeeperException getting assignments", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "startSuperstep: InterruptedException getting assignments", e);
+ }
+
+ workerInfoList.clear();
+ workerInfoList = addressesAndPartitions.getWorkerInfos();
+ masterInfo = addressesAndPartitions.getMasterInfo();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("startSuperstep: " + masterInfo);
+ LOG.info("startSuperstep: Ready for computation on superstep " +
+ getSuperstep() + " since worker " +
+ "selection and vertex range assignments are done in " +
+ addressesAndPartitionsPath);
+ }
+
+ getContext().setStatus("startSuperstep: " +
+ getGraphTaskManager().getGraphFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+ return addressesAndPartitions.getPartitionOwners();
+ }
+
+ @Override
+ public FinishedSuperstepStats finishSuperstep(
+ List<PartitionStats> partitionStatsList) {
+ // This barrier blocks until success (or the master signals it to
+ // restart).
+ //
+ // Master will coordinate the barriers and aggregate "doneness" of all
+ // the vertices. Each worker will:
+ // 1. Ensure that the requests are complete
+ // 2. Execute user postSuperstep() if necessary.
+ // 3. Save aggregator values that are in use.
+ // 4. Report the statistics (vertices, edges, messages, etc.)
+ // of this worker
+ // 5. Let the master know it is finished.
+ // 6. Wait for the master's superstep info, and check if done
+ waitForRequestsToFinish();
+
+ getGraphTaskManager().notifyFinishedCommunication();
+
+ long workerSentMessages = 0;
+ long workerSentMessageBytes = 0;
+ long localVertices = 0;
+ for (PartitionStats partitionStats : partitionStatsList) {
+ workerSentMessages += partitionStats.getMessagesSentCount();
+ workerSentMessageBytes += partitionStats.getMessageBytesSentCount();
+ localVertices += partitionStats.getVertexCount();
+ }
+
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ postSuperstepCallbacks();
+ }
+
+ aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("finishSuperstep: Superstep " + getSuperstep() +
+ ", messages = " + workerSentMessages + " " +
+ ", message bytes = " + workerSentMessageBytes + " , " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+
+ writeFinshedSuperstepInfoToZK(partitionStatsList,
+ workerSentMessages, workerSentMessageBytes);
+
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "finishSuperstep: (waiting for rest " +
+ "of workers) " +
+ getGraphTaskManager().getGraphFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+
+ String superstepFinishedNode =
+ getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
+
+ waitForOtherWorkers(superstepFinishedNode);
+
+ GlobalStats globalStats = new GlobalStats();
+ SuperstepClasses superstepClasses = new SuperstepClasses();
+ WritableUtils.readFieldsFromZnode(
+ getZkExt(), superstepFinishedNode, false, null, globalStats,
+ superstepClasses);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
+ " with global stats " + globalStats + " and classes " +
+ superstepClasses);
+ }
+ incrCachedSuperstep();
+ getContext().setStatus("finishSuperstep: (all workers done) " +
+ getGraphTaskManager().getGraphFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+ getConfiguration().updateSuperstepClasses(superstepClasses);
+
+ return new FinishedSuperstepStats(
+ localVertices,
+ globalStats.getHaltComputation(),
+ globalStats.getVertexCount(),
+ globalStats.getEdgeCount(),
+ false);
+ }
+
+ /**
+ * Handle post-superstep callbacks
+ */
+ private void postSuperstepCallbacks() {
+ GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
+ getWorkerContext().postSuperstep();
+ timerContext.stop();
+ getContext().progress();
+
+ for (WorkerObserver obs : getWorkerObservers()) {
+ obs.postSuperstep(getSuperstep());
+ getContext().progress();
+ }
+ }
+
+ /**
+ * Wait for all the requests to finish.
+ */
+ private void waitForRequestsToFinish() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("finishSuperstep: Waiting on all requests, superstep " +
+ getSuperstep() + " " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+ GiraphTimerContext timerContext = waitRequestsTimer.time();
+ workerClient.waitAllRequests();
+ timerContext.stop();
+ }
+
+ /**
+ * Wait for all the other Workers to finish the superstep.
+ *
+ * @param superstepFinishedNode ZooKeeper path to wait on.
+ */
+ private void waitForOtherWorkers(String superstepFinishedNode) {
+ try {
+ while (getZkExt().exists(superstepFinishedNode, true) == null) {
+ getSuperstepFinishedEvent().waitForever();
+ getSuperstepFinishedEvent().reset();
+ }
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "finishSuperstep: Failed while waiting for master to " +
+ "signal completion of superstep " + getSuperstep(), e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "finishSuperstep: Failed while waiting for master to " +
+ "signal completion of superstep " + getSuperstep(), e);
+ }
+ }
+
+ /**
+ * Write finished superstep info to ZooKeeper.
+ *
+ * @param partitionStatsList List of partition stats from superstep.
+ * @param workerSentMessages Number of messages sent in superstep.
+ * @param workerSentMessageBytes Number of message bytes sent
+ * in superstep.
+ */
+ private void writeFinshedSuperstepInfoToZK(
+ List<PartitionStats> partitionStatsList, long workerSentMessages,
+ long workerSentMessageBytes) {
+ Collection<PartitionStats> finalizedPartitionStats =
+ workerGraphPartitioner.finalizePartitionStats(
+ partitionStatsList, getPartitionStore());
+ List<PartitionStats> finalizedPartitionStatsList =
+ new ArrayList<PartitionStats>(finalizedPartitionStats);
+ byte[] partitionStatsBytes =
+ WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
+ WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
+ metrics.readFromRegistry();
+ byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
+
+ JSONObject workerFinishedInfoObj = new JSONObject();
+ try {
+ workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
+ Base64.encodeBytes(partitionStatsBytes));
+ workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
+ workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
+ workerSentMessageBytes);
+ workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
+ Base64.encodeBytes(metricsBytes));
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+
+ String finishedWorkerPath =
+ getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
+ "/" + getHostnamePartitionId();
+ try {
+ getZkExt().createExt(finishedWorkerPath,
+ workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("finishSuperstep: finished worker path " +
+ finishedWorkerPath + " already exists!");
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Creating " + finishedWorkerPath +
+ " failed with KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Creating " + finishedWorkerPath +
+ " failed with InterruptedException", e);
+ }
+ }
+
+ /**
+ * Save the vertices using the user-defined VertexOutputFormat from our
+ * vertexArray based on the split.
+ *
+ * @param numLocalVertices Number of local vertices
+ * @throws InterruptedException
+ */
+ private void saveVertices(long numLocalVertices) throws IOException,
+ InterruptedException {
+ if (getConfiguration().getVertexOutputFormatClass() == null) {
+ LOG.warn("saveVertices: " +
+ GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
+ " not specified -- there will be no saved output");
+ return;
+ }
+ if (getConfiguration().doOutputDuringComputation()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("saveVertices: The option for doing output during " +
+ "computation is selected, so there will be no saving of the " +
+ "output in the end of application");
+ }
+ return;
+ }
+
+ final int numPartitions = getPartitionStore().getNumPartitions();
+ int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
+ numPartitions);
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "saveVertices: Starting to save " + numLocalVertices + " vertices " +
+ "using " + numThreads + " threads");
+ final VertexOutputFormat<I, V, E> vertexOutputFormat =
+ getConfiguration().createWrappedVertexOutputFormat();
+
+ final Queue<Integer> partitionIdQueue =
+ (numPartitions == 0) ? new LinkedList<Integer>() :
+ new ArrayBlockingQueue<Integer>(numPartitions);
+ Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ VertexWriter<I, V, E> vertexWriter =
+ vertexOutputFormat.createVertexWriter(getContext());
+ vertexWriter.setConf(getConfiguration());
+ vertexWriter.initialize(getContext());
+ long nextPrintVertices = 0;
+ long nextPrintMsecs = System.currentTimeMillis() + 15000;
+ int partitionIndex = 0;
+ int numPartitions = getPartitionStore().getNumPartitions();
+ while (!partitionIdQueue.isEmpty()) {
+ Integer partitionId = partitionIdQueue.poll();
+ if (partitionId == null) {
+ break;
+ }
+
+ Partition<I, V, E> partition =
+ getPartitionStore().getPartition(partitionId);
+ long verticesWritten = 0;
+ for (Vertex<I, V, E> vertex : partition) {
+ vertexWriter.writeVertex(vertex);
+ ++verticesWritten;
+
+ // Update status at most every 250k vertices or 15 seconds
+ if (verticesWritten > nextPrintVertices &&
+ System.currentTimeMillis() > nextPrintMsecs) {
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "saveVertices: Saved " + verticesWritten + " out of " +
+ partition.getVertexCount() + " partition vertices, " +
+ "on partition " + partitionIndex +
+ " out of " + numPartitions);
+ nextPrintMsecs = System.currentTimeMillis() + 15000;
+ nextPrintVertices = verticesWritten + 250000;
+ }
+ }
+ getPartitionStore().putPartition(partition);
+ ++partitionIndex;
+ }
+ vertexWriter.close(getContext()); // the temp results are saved now
+ return null;
+ }
+ };
+ }
+ };
+ ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+ "save-vertices-%d", getContext());
+
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "saveVertices: Done saving vertices.");
+ // YARN: must complete the commit the "task" output, Hadoop isn't there.
+ if (getConfiguration().isPureYarnJob() &&
+ getConfiguration().getVertexOutputFormatClass() != null) {
+ try {
+ OutputCommitter outputCommitter =
+ vertexOutputFormat.getOutputCommitter(getContext());
+ if (outputCommitter.needsTaskCommit(getContext())) {
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "OutputCommitter: committing task output.");
+ // transfer from temp dirs to "task commit" dirs to prep for
+ // the master's OutputCommitter#commitJob(context) call to finish.
+ outputCommitter.commitTask(getContext());
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while attempting to obtain " +
+ "OutputCommitter.", ie);
+ } catch (IOException ioe) {
+ LOG.error("Master task's attempt to commit output has " +
+ "FAILED.", ioe);
+ }
+ }
+ }
+
+ @Override
+ public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
+ throws IOException, InterruptedException {
+ workerClient.closeConnections();
+ setCachedSuperstep(getSuperstep() - 1);
+ saveVertices(finishedSuperstepStats.getLocalVertexCount());
+ getPartitionStore().shutdown();
+ // All worker processes should denote they are done by adding special
+ // znode. Once the number of znodes equals the number of partitions
+ // for workers and masters, the master will clean up the ZooKeeper
+ // znodes associated with this job.
+ String workerCleanedUpPath = cleanedUpPath + "/" +
+ getTaskPartition() + WORKER_SUFFIX;
+ try {
+ String finalFinishedPath =
+ getZkExt().createExt(workerCleanedUpPath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("cleanup: Notifying master its okay to cleanup with " +
+ finalFinishedPath);
+ }
+ } catch (KeeperException.NodeExistsException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("cleanup: Couldn't create finished node '" +
+ workerCleanedUpPath);
+ }
+ } catch (KeeperException e) {
+ // Cleaning up, it's okay to fail after cleanup is successful
+ LOG.error("cleanup: Got KeeperException on notification " +
+ "to master about cleanup", e);
+ } catch (InterruptedException e) {
+ // Cleaning up, it's okay to fail after cleanup is successful
+ LOG.error("cleanup: Got InterruptedException on notification " +
+ "to master about cleanup", e);
+ }
+ try {
+ getZkExt().close();
+ } catch (InterruptedException e) {
+ // cleanup phase -- just log the error
+ LOG.error("cleanup: Zookeeper failed to close with " + e);
+ }
+
+ if (getConfiguration().metricsEnabled()) {
+ GiraphMetrics.get().dumpToStream(System.err);
+ }
+
+ // Preferably would shut down the service only after
+ // all clients have disconnected (or the exceptions on the
+ // client side ignored).
+ workerServer.close();
+ }
+
+ @Override
+ public void storeCheckpoint() throws IOException {
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "storeCheckpoint: Starting checkpoint " +
+ getGraphTaskManager().getGraphFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+
+ // Algorithm:
+ // For each partition, dump vertices and messages
+ Path metadataFilePath =
+ new Path(getCheckpointBasePath(getSuperstep()) + "." +
+ getHostnamePartitionId() +
+ CHECKPOINT_METADATA_POSTFIX);
+ Path verticesFilePath =
+ new Path(getCheckpointBasePath(getSuperstep()) + "." +
+ getHostnamePartitionId() +
+ CHECKPOINT_VERTICES_POSTFIX);
+ Path validFilePath =
+ new Path(getCheckpointBasePath(getSuperstep()) + "." +
+ getHostnamePartitionId() +
+ CHECKPOINT_VALID_POSTFIX);
+
+ // Remove these files if they already exist (shouldn't though, unless
+ // of previous failure of this worker)
+ if (getFs().delete(validFilePath, false)) {
+ LOG.warn("storeCheckpoint: Removed valid file " +
+ validFilePath);
+ }
+ if (getFs().delete(metadataFilePath, false)) {
+ LOG.warn("storeCheckpoint: Removed metadata file " +
+ metadataFilePath);
+ }
+ if (getFs().delete(verticesFilePath, false)) {
+ LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
+ }
+
+ FSDataOutputStream verticesOutputStream =
+ getFs().create(verticesFilePath);
+ ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
+ DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
+ for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+ Partition<I, V, E> partition =
+ getPartitionStore().getPartition(partitionId);
+ long startPos = verticesOutputStream.getPos();
+ partition.write(verticesOutputStream);
+ // write messages
+ getServerData().getCurrentMessageStore().writePartition(
+ verticesOutputStream, partition.getId());
+ // Write the metadata for this partition
+ // Format:
+ // <index count>
+ // <index 0 start pos><partition id>
+ // <index 1 start pos><partition id>
+ metadataOutput.writeLong(startPos);
+ metadataOutput.writeInt(partition.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("storeCheckpoint: Vertex file starting " +
+ "offset = " + startPos + ", length = " +
+ (verticesOutputStream.getPos() - startPos) +
+ ", partition = " + partition.toString());
+ }
+ getPartitionStore().putPartition(partition);
+ getContext().progress();
+ }
+ // Metadata is buffered and written at the end since it's small and
+ // needs to know how many partitions this worker owns
+ FSDataOutputStream metadataOutputStream =
+ getFs().create(metadataFilePath);
+ metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
+ metadataOutputStream.write(metadataByteStream.toByteArray());
+ metadataOutputStream.close();
+ verticesOutputStream.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("storeCheckpoint: Finished metadata (" +
+ metadataFilePath + ") and vertices (" + verticesFilePath + ").");
+ }
+
+ getFs().createNewFile(validFilePath);
+
+ // Notify master that checkpoint is stored
+ String workerWroteCheckpoint =
+ getWorkerWroteCheckpointPath(getApplicationAttempt(),
+ getSuperstep()) + "/" + getHostnamePartitionId();
+ try {
+ getZkExt().createExt(workerWroteCheckpoint,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("storeCheckpoint: wrote checkpoint worker path " +
+ workerWroteCheckpoint + " already exists!");
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+ " failed with KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Creating " +
+ workerWroteCheckpoint +
+ " failed with InterruptedException", e);
+ }
+ }
+
+ @Override
+ public VertexEdgeCount loadCheckpoint(long superstep) {
+ try {
+ // clear old message stores
+ getServerData().getIncomingMessageStore().clearAll();
+ getServerData().getCurrentMessageStore().clearAll();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "loadCheckpoint: Failed to clear message stores ", e);
+ }
+
+ // Algorithm:
+ // Examine all the partition owners and load the ones
+ // that match my hostname and id from the master designated checkpoint
+ // prefixes.
+ long startPos = 0;
+ int loadedPartitions = 0;
+ for (PartitionOwner partitionOwner :
+ workerGraphPartitioner.getPartitionOwners()) {
+ if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
+ String metadataFile =
+ partitionOwner.getCheckpointFilesPrefix() +
+ CHECKPOINT_METADATA_POSTFIX;
+ String partitionsFile =
+ partitionOwner.getCheckpointFilesPrefix() +
+ CHECKPOINT_VERTICES_POSTFIX;
+ try {
+ int partitionId = -1;
+ DataInputStream metadataStream =
+ getFs().open(new Path(metadataFile));
+ int partitions = metadataStream.readInt();
+ for (int i = 0; i < partitions; ++i) {
+ startPos = metadataStream.readLong();
+ partitionId = metadataStream.readInt();
+ if (partitionId == partitionOwner.getPartitionId()) {
+ break;
+ }
+ }
+ if (partitionId != partitionOwner.getPartitionId()) {
+ throw new IllegalStateException(
+ "loadCheckpoint: " + partitionOwner +
+ " not found!");
+ }
+ metadataStream.close();
+ Partition<I, V, E> partition =
+ getConfiguration().createPartition(partitionId, getContext());
+ DataInputStream partitionsStream =
+ getFs().open(new Path(partitionsFile));
+ if (partitionsStream.skip(startPos) != startPos) {
+ throw new IllegalStateException(
+ "loadCheckpoint: Failed to skip " + startPos +
+ " on " + partitionsFile);
+ }
+ partition.readFields(partitionsStream);
+ getServerData().getIncomingMessageStore().readFieldsForPartition(
+ partitionsStream, partitionId);
+ partitionsStream.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadCheckpoint: Loaded partition " +
+ partition);
+ }
+ if (getPartitionStore().hasPartition(partitionId)) {
+ throw new IllegalStateException(
+ "loadCheckpoint: Already has partition owner " +
+ partitionOwner);
+ }
+ getPartitionStore().addPartition(partition);
+ getContext().progress();
+ ++loadedPartitions;
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "loadCheckpoint: Failed to get partition owner " +
+ partitionOwner, e);
+ }
+ }
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
+ " partitions of out " +
+ workerGraphPartitioner.getPartitionOwners().size() +
+ " total.");
+ }
+
+ // Load global stats and superstep classes
+ GlobalStats globalStats = new GlobalStats();
+ SuperstepClasses superstepClasses = new SuperstepClasses();
+ String finalizedCheckpointPath =
+ getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+ try {
+ DataInputStream finalizedStream =
+ getFs().open(new Path(finalizedCheckpointPath));
+ globalStats.readFields(finalizedStream);
+ superstepClasses.readFields(finalizedStream);
+ getConfiguration().updateSuperstepClasses(superstepClasses);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "loadCheckpoint: Failed to load global stats and superstep classes",
+ e);
+ }
+
+ getServerData().prepareSuperstep();
+ // Communication service needs to setup the connections prior to
+ // processing vertices
+/*if[HADOOP_NON_SECURE]
+ workerClient.setup();
+else[HADOOP_NON_SECURE]*/
+ workerClient.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
+ return new VertexEdgeCount(globalStats.getVertexCount(),
+ globalStats.getEdgeCount());
+ }
+
+ /**
+ * Send the worker partitions to their destination workers
+ *
+ * @param workerPartitionMap Map of worker info to the partitions stored
+ * on this worker to be sent
+ */
+ private void sendWorkerPartitions(
+ Map<WorkerInfo, List<Integer>> workerPartitionMap) {
+ List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
+ new ArrayList<Entry<WorkerInfo, List<Integer>>>(
+ workerPartitionMap.entrySet());
+ Collections.shuffle(randomEntryList);
+ WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
+ new NettyWorkerClientRequestProcessor<I, V, E>(getContext(),
+ getConfiguration(), this);
+ for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
+ randomEntryList) {
+ for (Integer partitionId : workerPartitionList.getValue()) {
+ Partition<I, V, E> partition =
+ getPartitionStore().removePartition(partitionId);
+ if (partition == null) {
+ throw new IllegalStateException(
+ "sendWorkerPartitions: Couldn't find partition " +
+ partitionId + " to send to " +
+ workerPartitionList.getKey());
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("sendWorkerPartitions: Sending worker " +
+ workerPartitionList.getKey() + " partition " +
+ partitionId);
+ }
+ workerClientRequestProcessor.sendPartitionRequest(
+ workerPartitionList.getKey(),
+ partition);
+ }
+ }
+
+
+ try {
+ workerClientRequestProcessor.flush();
+ workerClient.waitAllRequests();
+ } catch (IOException e) {
+ throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
+ }
+ String myPartitionExchangeDonePath =
+ getPartitionExchangeWorkerPath(
+ getApplicationAttempt(), getSuperstep(), getWorkerInfo());
+ try {
+ getZkExt().createExt(myPartitionExchangeDonePath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "sendWorkerPartitions: KeeperException to create " +
+ myPartitionExchangeDonePath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "sendWorkerPartitions: InterruptedException to create " +
+ myPartitionExchangeDonePath, e);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("sendWorkerPartitions: Done sending all my partitions.");
+ }
+ }
+
+ @Override
+ public final void exchangeVertexPartitions(
+ Collection<? extends PartitionOwner> masterSetPartitionOwners) {
+ // 1. Fix the addresses of the partition ids if they have changed.
+ // 2. Send all the partitions to their destination workers in a random
+ // fashion.
+ // 3. Notify completion with a ZooKeeper stamp
+ // 4. Wait for all my dependencies to be done (if any)
+ // 5. Add the partitions to myself.
+ PartitionExchange partitionExchange =
+ workerGraphPartitioner.updatePartitionOwners(
+ getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+ workerClient.openConnections();
+
+ Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
+ partitionExchange.getSendWorkerPartitionMap();
+ if (!getPartitionStore().isEmpty()) {
+ sendWorkerPartitions(sendWorkerPartitionMap);
+ }
+
+ Set<WorkerInfo> myDependencyWorkerSet =
+ partitionExchange.getMyDependencyWorkerSet();
+ Set<String> workerIdSet = new HashSet<String>();
+ for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
+ if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
+ throw new IllegalStateException(
+ "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
+ }
+ }
+ if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
+ "exiting early");
+ }
+ return;
+ }
+
+ String vertexExchangePath =
+ getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
+ List<String> workerDoneList;
+ try {
+ while (true) {
+ workerDoneList = getZkExt().getChildrenExt(
+ vertexExchangePath, true, false, false);
+ workerIdSet.removeAll(workerDoneList);
+ if (workerIdSet.isEmpty()) {
+ break;
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("exchangeVertexPartitions: Waiting for workers " +
+ workerIdSet);
+ }
+ getPartitionExchangeChildrenChangedEvent().waitForever();
+ getPartitionExchangeChildrenChangedEvent().reset();
+ }
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("exchangeVertexPartitions: Done with exchange.");
+ }
+ }
+
+ /**
+ * Get event when the state of a partition exchange has changed.
+ *
+ * @return Event to check.
+ */
+ public final BspEvent getPartitionExchangeChildrenChangedEvent() {
+ return partitionExchangeChildrenChanged;
+ }
+
+ @Override
+ protected boolean processEvent(WatchedEvent event) {
+ boolean foundEvent = false;
+ if (event.getPath().startsWith(masterJobStatePath) &&
+ (event.getType() == EventType.NodeChildrenChanged)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("processEvent: Job state changed, checking " +
+ "to see if it needs to restart");
+ }
+ JSONObject jsonObj = getJobState();
+ // in YARN, we have to manually commit our own output in 2 stages that we
+ // do not have to do in Hadoop-based Giraph. So jsonObj can be null.
+ if (getConfiguration().isPureYarnJob() && null == jsonObj) {
+ LOG.error("BspServiceWorker#getJobState() came back NULL.");
+ return false; // the event has been processed.
+ }
+ try {
+ if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
+ ApplicationState.START_SUPERSTEP) &&
+ jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
+ getApplicationAttempt()) {
+ LOG.fatal("processEvent: Worker will restart " +
+ "from command - " + jsonObj.toString());
+ System.exit(-1);
+ }
+ } catch (JSONException e) {
+ throw new RuntimeException(
+ "processEvent: Couldn't properly get job state from " +
+ jsonObj.toString());
+ }
+ foundEvent = true;
+ } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
+ event.getType() == EventType.NodeChildrenChanged) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("processEvent : partitionExchangeChildrenChanged " +
+ "(at least one worker is done sending partitions)");
+ }
+ partitionExchangeChildrenChanged.signal();
+ foundEvent = true;
+ }
+
+ return foundEvent;
+ }
+
+ @Override
+ public WorkerInfo getWorkerInfo() {
+ return workerInfo;
+ }
+
+ @Override
+ public PartitionStore<I, V, E> getPartitionStore() {
+ return getServerData().getPartitionStore();
+ }
+
+ @Override
+ public PartitionOwner getVertexPartitionOwner(I vertexId) {
+ return workerGraphPartitioner.getPartitionOwner(vertexId);
+ }
+
+ @Override
+ public Iterable<? extends PartitionOwner> getPartitionOwners() {
+ return workerGraphPartitioner.getPartitionOwners();
+ }
+
+ @Override
+ public int getPartitionId(I vertexId) {
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
+ return partitionOwner.getPartitionId();
+ }
+
+ @Override
+ public boolean hasPartition(Integer partitionId) {
+ return getPartitionStore().hasPartition(partitionId);
+ }
+
+ @Override
+ public ServerData<I, V, E> getServerData() {
+ return workerServer.getServerData();
+ }
+
+ @Override
+ public WorkerAggregatorHandler getAggregatorHandler() {
+ return aggregatorHandler;
+ }
+
+ @Override
+ public void prepareSuperstep() {
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+ }
+ }
+
+ @Override
+ public SuperstepOutput<I, V, E> getSuperstepOutput() {
+ return superstepOutput;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/test/java/org/apache/giraph/io/TestSrcIdDstIdEdgeValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestSrcIdDstIdEdgeValueTextOutputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestSrcIdDstIdEdgeValueTextOutputFormat.java
new file mode 100644
index 0000000..9909871
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestSrcIdDstIdEdgeValueTextOutputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE;
+import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.formats.SrcIdDstIdEdgeValueTextOutputFormat;
+import org.apache.giraph.utils.NoOpComputation;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSrcIdDstIdEdgeValueTextOutputFormat
+ extends SrcIdDstIdEdgeValueTextOutputFormat<LongWritable,
+ LongWritable, LongWritable> {
+ /** Test configuration */
+ private ImmutableClassesGiraphConfiguration<
+ LongWritable, LongWritable, LongWritable> conf;
+ /**
+ * Dummy class to allow ImmutableClassesGiraphConfiguration to be created.
+ */
+ public static class DummyComputation extends NoOpComputation<Text,
+ DoubleWritable, DoubleWritable, DoubleWritable> { }
+
+ @Before
+ public void setUp() {
+ GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
+ giraphConfiguration.setComputationClass(DummyComputation.class);
+ conf =
+ new ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
+ LongWritable>(giraphConfiguration);
+ }
+
+ @Test
+ public void testHappyPath() throws IOException, InterruptedException {
+ Text expected = new Text("0\t1\t5");
+
+ checkSrcIdDstIdEdgeValueWorker(expected);
+ }
+
+ @Test
+ public void testReverseIdAndValue() throws IOException, InterruptedException {
+ GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE.set(this.conf, true);
+ Text expected = new Text("5\t1\t0");
+
+ checkSrcIdDstIdEdgeValueWorker(expected);
+ }
+
+ @Test
+ public void testWithDifferentDelimiter() throws IOException,
+ InterruptedException {
+ GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR.set(this.conf, "->");
+ Text expected = new Text("0->1->5");
+
+ checkSrcIdDstIdEdgeValueWorker(expected);
+ }
+
+ private void checkSrcIdDstIdEdgeValueWorker(Text expected)
+ throws IOException, InterruptedException {
+
+ TaskAttemptContext tac = mock(TaskAttemptContext.class);
+ when(tac.getConfiguration()).thenReturn(conf);
+
+ Edge edge = mock(Edge.class);
+
+ when(edge.getTargetVertexId()).thenReturn(new LongWritable(1));
+ when(edge.getValue()).thenReturn(new LongWritable(5));
+
+ final RecordWriter<Text, Text> tw = mock(RecordWriter.class);
+ SrcIdDstIdEdgeValueEdgeWriter writer = new SrcIdDstIdEdgeValueEdgeWriter() {
+ @Override
+ protected RecordWriter<Text, Text> createLineRecordWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+
+ return tw;
+ }
+ };
+
+ writer.setConf(conf);
+ writer.initialize(tac);
+ writer.writeEdge(new LongWritable(0), new LongWritable(0), edge);
+
+ verify(tw).write(expected, null);
+ }
+}
[2/2] git commit: updated refs/heads/trunk to ae01f03
Posted by cl...@apache.org.
GIRAPH-732
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ae01f039
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ae01f039
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ae01f039
Branch: refs/heads/trunk
Commit: ae01f0399cae6baab045b2ece0d71096aebe8ca3
Parents: fa6b754
Author: Claudio Martella <cl...@apache.org>
Authored: Mon Aug 26 22:15:18 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Mon Aug 26 22:15:18 2013 +0200
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/GiraphRunner.java | 2 +-
.../org/apache/giraph/conf/GiraphClasses.java | 25 +
.../apache/giraph/conf/GiraphConfiguration.java | 57 +
.../org/apache/giraph/conf/GiraphConstants.java | 23 +
.../ImmutableClassesGiraphConfiguration.java | 45 +
.../org/apache/giraph/io/EdgeOutputFormat.java | 82 +
.../java/org/apache/giraph/io/EdgeWriter.java | 74 +
.../io/formats/GiraphTextOutputFormat.java | 90 +
.../io/formats/IdWithValueTextOutputFormat.java | 19 +-
.../SrcIdDstIdEdgeValueTextOutputFormat.java | 91 ++
.../giraph/io/formats/TextEdgeOutputFormat.java | 165 ++
.../io/formats/TextVertexOutputFormat.java | 15 +-
.../io/internal/WrappedEdgeOutputFormat.java | 169 ++
.../apache/giraph/utils/ConfigurationUtils.java | 53 +-
.../apache/giraph/worker/BspServiceWorker.java | 124 +-
.../giraph/worker/BspServiceWorker.java.orig | 1535 ++++++++++++++++++
...TestSrcIdDstIdEdgeValueTextOutputFormat.java | 114 ++
18 files changed, 2659 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index af43ef8..deca52b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-732: EdgeOutputFormat (aarmax00 via claudio)
+
GIRAPH-512: JavaDoc warnings (tdn120 via nitay)
GIRAPH-736: Bring back FindBugs (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
index 1bd79b5..9af50e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java
@@ -102,7 +102,7 @@ public class GiraphRunner implements Tool {
*/
private void prepareHadoopMRJob(final GiraphJob job, final CommandLine cmd)
throws Exception {
- if (cmd.hasOption("of")) {
+ if (cmd.hasOption("vof") || cmd.hasOption("eof")) {
if (cmd.hasOption("op")) {
FileOutputFormat.setOutputPath(job.getInternalJob(),
new Path(cmd.getOptionValue("op")));
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 71fe885..f97446f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -28,6 +28,7 @@ import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -85,6 +86,9 @@ public class GiraphClasses<I extends WritableComparable,
/** Edge input format class - cached for fast access */
protected Class<? extends EdgeInputFormat<I, E>>
edgeInputFormatClass;
+ /** Edge output format class - cached for fast access */
+ protected Class<? extends EdgeOutputFormat<I, V, E>>
+ edgeOutputFormatClass;
/** Aggregator writer class - cached for fast access */
protected Class<? extends AggregatorWriter> aggregatorWriterClass;
@@ -168,6 +172,8 @@ public class GiraphClasses<I extends WritableComparable,
VERTEX_OUTPUT_FORMAT_CLASS.get(conf);
edgeInputFormatClass = (Class<? extends EdgeInputFormat<I, E>>)
EDGE_INPUT_FORMAT_CLASS.get(conf);
+ edgeOutputFormatClass = (Class<? extends EdgeOutputFormat<I, V, E>>)
+ EDGE_OUTPUT_FORMAT_CLASS.get(conf);
aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
combinerClass = (Class<? extends Combiner<I, ? extends Writable>>)
@@ -347,6 +353,25 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
+ * Check if EdgeOutputFormat is set
+ *
+ * @return true if EdgeOutputFormat is set
+ */
+ public boolean hasEdgeOutputFormat() {
+ return edgeOutputFormatClass != null;
+ }
+
+ /**
+ * Get VertexOutputFormat set
+ *
+ * @return VertexOutputFormat
+ */
+ public Class<? extends EdgeOutputFormat<I, V, E>>
+ getEdgeOutputFormatClass() {
+ return edgeOutputFormatClass;
+ }
+
+ /**
* Check if AggregatorWriter is set
*
* @return true if AggregatorWriter is set
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 23bcd32..15ff861 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.giraph.factories.VertexValueFactory;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.filters.EdgeInputFilter;
@@ -345,6 +346,25 @@ public class GiraphConfiguration extends Configuration
VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
}
+
+ /**
+ * Does the job have a {@link EdgeOutputFormat} subdir?
+ *
+ * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
+ */
+ public boolean hasVertexOutputFormatSubdir() {
+ return !VERTEX_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
+ }
+
+ /**
+ * Set the vertex output format path
+ *
+ * @param path path where the verteces will be written
+ */
+ public final void setVertexOutputFormatSubdir(String path) {
+ VERTEX_OUTPUT_FORMAT_SUBDIR.set(this, path);
+ }
+
/**
* Check if output should be done during computation
*
@@ -386,6 +406,43 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Does the job have a {@link EdgeOutputFormat}?
+ *
+ * @return True iff a {@link EdgeOutputFormat} has been specified.
+ */
+ public boolean hasEdgeOutputFormat() {
+ return EDGE_OUTPUT_FORMAT_CLASS.get(this) != null;
+ }
+
+ /**
+ * Set the edge output format class (optional)
+ *
+ * @param edgeOutputFormatClass Determines how graph is output
+ */
+ public final void setEdgeOutputFormatClass(
+ Class<? extends EdgeOutputFormat> edgeOutputFormatClass) {
+ EDGE_OUTPUT_FORMAT_CLASS.set(this, edgeOutputFormatClass);
+ }
+
+ /**
+ * Does the job have a {@link EdgeOutputFormat} subdir?
+ *
+ * @return True iff a {@link EdgeOutputFormat} subdir has been specified.
+ */
+ public boolean hasEdgeOutputFormatSubdir() {
+ return !EDGE_OUTPUT_FORMAT_SUBDIR.get(this).isEmpty();
+ }
+
+ /**
+ * Set the edge output format path
+ *
+ * @param path path where the edges will be written
+ */
+ public final void setEdgeOutputFormatSubdir(String path) {
+ EDGE_OUTPUT_FORMAT_SUBDIR.set(this, path);
+ }
+
+ /**
* Get the number of threads to use for writing output in the end of the
* application. If output format is not thread safe, returns 1.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c276c2a..604729a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -38,6 +38,7 @@ import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.Language;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -210,6 +211,28 @@ public interface GiraphConstants {
ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
ClassConfOption.create("giraph.vertexOutputFormatClass", null,
VertexOutputFormat.class, "VertexOutputFormat class");
+ /** EdgeOutputFormat sub-directory */
+ StrConfOption VERTEX_OUTPUT_FORMAT_SUBDIR =
+ new StrConfOption("giraph.vertex.output.subdir", "",
+ "VertexOutputFormat sub-directory");
+ /** EdgeOutputFormat class */
+ ClassConfOption<EdgeOutputFormat> EDGE_OUTPUT_FORMAT_CLASS =
+ ClassConfOption.create("giraph.edgeOutputFormatClass", null,
+ EdgeOutputFormat.class, "EdgeOutputFormat class");
+ /** EdgeOutputFormat sub-directory */
+ StrConfOption EDGE_OUTPUT_FORMAT_SUBDIR =
+ new StrConfOption("giraph.edge.output.subdir", "edges",
+ "EdgeOutputFormat sub-directory");
+
+ /** GiraphTextOuputFormat Separator */
+ StrConfOption GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR =
+ new StrConfOption("giraph.textoutputformat.separator", "\t",
+ "GiraphTextOuputFormat Separator");
+ /** Reverse values in the output */
+ BooleanConfOption GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE =
+ new BooleanConfOption("giraph.textoutputformat.reverse", false,
+ "Reverse values in the output");
+
/**
* If you use this option, instead of having saving vertices in the end of
* application, saveVertex will be called right after each vertex.compute()
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 49a2ebc..2506c21 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -36,11 +36,13 @@ import org.apache.giraph.graph.Language;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.filters.EdgeInputFilter;
import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
+import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
import org.apache.giraph.io.internal.WrappedVertexInputFormat;
import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
@@ -289,6 +291,49 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
return wrappedVertexOutputFormat;
}
+ @Override
+ public boolean hasEdgeOutputFormat() {
+ return classes.hasEdgeOutputFormat();
+ }
+
+ /**
+ * Get the user's subclassed
+ * {@link org.apache.giraph.io.EdgeOutputFormat}.
+ *
+ * @return User's edge output format class
+ */
+ public Class<? extends EdgeOutputFormat<I, V, E>>
+ getEdgeOutputFormatClass() {
+ return classes.getEdgeOutputFormatClass();
+ }
+
+ /**
+ * Create a user edge output format class.
+ * Note: Giraph should only use WrappedEdgeOutputFormat,
+ * which makes sure that Configuration parameters are set properly.
+ *
+ * @return Instantiated user edge output format class
+ */
+ private EdgeOutputFormat<I, V, E> createEdgeOutputFormat() {
+ Class<? extends EdgeOutputFormat<I, V, E>> klass =
+ getEdgeOutputFormatClass();
+ return ReflectionUtils.newInstance(klass, this);
+ }
+
+ /**
+ * Create a wrapper for user edge output format,
+ * which makes sure that Configuration parameters are set properly in all
+ * methods related to this format.
+ *
+ * @return Wrapper around user edge output format
+ */
+ public WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat() {
+ WrappedEdgeOutputFormat<I, V, E> wrappedEdgeOutputFormat =
+ new WrappedEdgeOutputFormat<I, V, E>(createEdgeOutputFormat());
+ configureIfPossible(wrappedEdgeOutputFormat);
+ return wrappedEdgeOutputFormat;
+ }
+
/**
* Create the proper superstep output, based on the configuration settings.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
new file mode 100644
index 0000000..ac4c6ce
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * abstract class which can only write edges
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeOutputFormat<
+ I extends WritableComparable, V extends Writable,
+ E extends Writable> extends
+ DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+ /**
+ * Create an edge writer for a given split. The framework will call
+ * {@link EdgeWriter#initialize(TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param context the information about the task
+ * @return a new vertex writer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract EdgeWriter<I, V, E> createEdgeWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException;
+
+ /**
+ * Check for validity of the output-specification for the job.
+ * (Copied from Hadoop OutputFormat)
+ *
+ * <p>This is to validate the output specification for the job when it is
+ * a job is submitted. Typically checks that it does not already exist,
+ * throwing an exception when it already exists, so that output is not
+ * overwritten.</p>
+ *
+ * @param context information about the job
+ * @throws IOException when output should not be attempted
+ */
+ public abstract void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException;
+
+ /**
+ * Get the output committer for this output format. This is responsible
+ * for ensuring the output is committed correctly.
+ * (Copied from Hadoop OutputFormat)
+ *
+ * @param context the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract OutputCommitter getOutputCommitter(
+ TaskAttemptContext context) throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java
new file mode 100644
index 0000000..e5a78c2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeWriter<
+ I extends WritableComparable, V extends Writable,
+ E extends Writable> extends
+ DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+
+ /**
+ * Writes the next vertex and associated data
+ *
+ * @param sourceId the vertex ID from which the edge originates
+ * @param sourceValue the vertex value; the vertex is the one from which
+ * the edge originates
+ * @param edge edge to be written
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
+ throws IOException, InterruptedException;
+
+ /**
+ * Use the context to setup writing the edges.
+ * Guaranteed to be called prior to any other function.
+ *
+ * @param context Context used to write the vertices.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract void initialize(TaskAttemptContext context)
+ throws IOException, InterruptedException;
+
+ /**
+ * Close this {@link EdgeWriter} to future operations.
+ *
+ * @param context the context of the task
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract void close(TaskAttemptContext context)
+ throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
new file mode 100644
index 0000000..582dea2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextOutputFormat.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The text output format used for Giraph text writing.
+ */
+public abstract class GiraphTextOutputFormat
+ extends TextOutputFormat<Text, Text> {
+
+ @Override
+ public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
+ throws IOException, InterruptedException {
+ String extension = "";
+ CompressionCodec codec = null;
+ Configuration conf = job.getConfiguration();
+ boolean isCompressed = getCompressOutput(job);
+
+ if (isCompressed) {
+ Class<? extends CompressionCodec> codecClass =
+ getOutputCompressorClass(job, GzipCodec.class);
+ codec =
+ (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+ extension = codec.getDefaultExtension();
+ }
+ Path file = getDefaultWorkFile(job, extension);
+
+ /* adjust the path */
+ FSDataOutputStream fileOut;
+ FileSystem fs = file.getFileSystem(conf);
+ String subdir = getSubdir();
+ if (!subdir.isEmpty()) {
+ Path subdirPath = new Path(subdir);
+ Path subdirAbsPath = new Path(file.getParent(), subdirPath);
+ Path vertexFile = new Path(subdirAbsPath, file.getName());
+ fileOut = fs.create(vertexFile, false);
+ } else {
+ fileOut = fs.create(file, false);
+ }
+
+ String separator = "\t";
+
+ if (!isCompressed) {
+ return new LineRecordWriter<Text, Text>(fileOut, separator);
+ } else {
+ DataOutputStream out =
+ new DataOutputStream(codec.createOutputStream(fileOut));
+ return new LineRecordWriter<Text, Text>(out, separator);
+ }
+ }
+
+ /**
+ * This function is used to provide an additional path level to keep
+ * different text outputs into different directories.
+ *
+ * @return the subdirectory to be created under the output path
+ */
+ protected abstract String getSubdir();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
index bd69586..e886059 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
@@ -77,19 +77,18 @@ public class IdWithValueTextOutputFormat<I extends WritableComparable,
@Override
protected Text convertVertexToLine(Vertex<I, V, E> vertex)
throws IOException {
- String first;
- String second;
+
+ StringBuilder str = new StringBuilder();
if (reverseOutput) {
- first = vertex.getValue().toString();
- second = vertex.getId().toString();
+ str.append(vertex.getValue().toString());
+ str.append(delimiter);
+ str.append(vertex.getId().toString());
} else {
- first = vertex.getId().toString();
- second = vertex.getValue().toString();
+ str.append(vertex.getId().toString());
+ str.append(delimiter);
+ str.append(vertex.getValue().toString());
}
- Text line = new Text(first + delimiter + second);
- return line;
+ return new Text(str.toString());
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
new file mode 100644
index 0000000..1d7478f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SrcIdDstIdEdgeValueTextOutputFormat.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR;
+import static org.apache.giraph.conf.GiraphConstants.GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE;
+
+/**
+ * Write out Edge Value with Source and Destination ID, but not the vertex
+ * value.
+ * This is a demostration output format to show the possibility to separately
+ * output edges from vertices.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class SrcIdDstIdEdgeValueTextOutputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends TextEdgeOutputFormat<I, V, E> {
+
+ @Override
+ public TextEdgeWriter createEdgeWriter(TaskAttemptContext context) {
+ return new SrcIdDstIdEdgeValueEdgeWriter();
+ }
+
+ /**
+ * Edge writer used with {@link SrcIdDstIdEdgeValueTextOutputFormat}.
+ */
+ protected class SrcIdDstIdEdgeValueEdgeWriter
+ extends TextEdgeWriterToEachLine {
+
+ /** Saved delimiter */
+ private String delimiter;
+ /** Cached reserve option */
+ private boolean reverseOutput;
+
+ @Override
+ public void initialize(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ super.initialize(context);
+ delimiter = GIRAPH_TEXT_OUTPUT_FORMAT_SEPARATOR.get(getConf());
+ reverseOutput = GIRAPH_TEXT_OUTPUT_FORMAT_REVERSE.get(getConf());
+ }
+
+ @Override
+ protected Text convertEdgeToLine(I sourceId, V sourceValue, Edge<I, E> edge)
+ throws IOException {
+ StringBuilder msg = new StringBuilder();
+ if (reverseOutput) {
+ msg.append(edge.getValue().toString());
+ msg.append(delimiter);
+ msg.append(edge.getTargetVertexId().toString());
+ msg.append(delimiter);
+ msg.append(sourceId.toString());
+ } else {
+ msg.append(sourceId.toString());
+ msg.append(delimiter);
+ msg.append(edge.getTargetVertexId().toString());
+ msg.append(delimiter);
+ msg.append(edge.getValue().toString());
+ }
+ return new Text(msg.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
new file mode 100644
index 0000000..1b20c57
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeOutputFormat.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import static org.apache.giraph.conf.GiraphConstants.EDGE_OUTPUT_FORMAT_SUBDIR;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * edge output format.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextEdgeOutputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends EdgeOutputFormat<I, V, E> {
+ /** Uses the TextOutputFormat to do everything */
+ protected GiraphTextOutputFormat textOutputFormat =
+ new GiraphTextOutputFormat() {
+ @Override
+ protected String getSubdir() {
+ return EDGE_OUTPUT_FORMAT_SUBDIR.get(getConf());
+ }
+ };
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ textOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return textOutputFormat.getOutputCommitter(context);
+ }
+
+ /**
+ * The factory method which produces the {@link TextEdgeWriter} used by this
+ * output format.
+ *
+ * @param context the information about the task
+ * @return the text edge writer to be used
+ */
+ @Override
+ public abstract TextEdgeWriter createEdgeWriter(TaskAttemptContext
+ context) throws IOException, InterruptedException;
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * edge output. Easiest to ignore the key value separator and only use
+ * key instead.
+ */
+ protected abstract class TextEdgeWriter
+ extends EdgeWriter<I, V, E> {
+ /** Internal line record writer */
+ private RecordWriter<Text, Text> lineRecordWriter;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ lineRecordWriter = createLineRecordWriter(context);
+ this.context = context;
+ }
+
+ /**
+ * Create the line record writer. Override this to use a different
+ * underlying record writer (useful for testing).
+ *
+ * @param context the context passed to initialize
+ * @return the record writer to be used
+ * @throws IOException exception that can be thrown during creation
+ * @throws InterruptedException exception that can be thrown during creation
+ */
+ protected RecordWriter<Text, Text> createLineRecordWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return textOutputFormat.getRecordWriter(context);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ lineRecordWriter.close(context);
+ }
+
+ /**
+ * Get the line record writer.
+ *
+ * @return Record writer to be used for writing.
+ */
+ public RecordWriter<Text, Text> getRecordWriter() {
+ return lineRecordWriter;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ /**
+ * Abstract class to be implemented by the user to write a line for each
+ * edge.
+ */
+ protected abstract class TextEdgeWriterToEachLine extends TextEdgeWriter {
+
+ @Override
+ public final void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
+ throws IOException, InterruptedException {
+
+ // Note we are writing line as key with null value
+ getRecordWriter().write(
+ convertEdgeToLine(sourceId, sourceValue, edge), null);
+ }
+
+ /**
+ * Writes a line for the given edge.
+ *
+ * @param sourceId the current id of the source vertex
+ * @param sourceValue the current value of the source vertex
+ * @param edge the current vertex for writing
+ * @return the text line to be written
+ * @throws IOException exception that can be thrown while writing
+ */
+ protected abstract Text convertEdgeToLine(I sourceId,
+ V sourceValue, Edge<I, E> edge) throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
index c91d543..c57ecd7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
@@ -19,6 +19,7 @@
package org.apache.giraph.io.formats;
import java.io.IOException;
+
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.VertexWriter;
@@ -29,7 +30,8 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_SUBDIR;
/**
* Abstract class that users should subclass to use their own text based
@@ -43,10 +45,14 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public abstract class TextVertexOutputFormat<I extends WritableComparable,
V extends Writable, E extends Writable>
extends VertexOutputFormat<I, V, E> {
-
/** Uses the TextOutputFormat to do everything */
- protected TextOutputFormat<Text, Text> textOutputFormat =
- new TextOutputFormat<Text, Text>();
+ protected GiraphTextOutputFormat textOutputFormat =
+ new GiraphTextOutputFormat() {
+ @Override
+ protected String getSubdir() {
+ return VERTEX_OUTPUT_FORMAT_SUBDIR.get(getConf());
+ }
+ };
@Override
public void checkOutputSpecs(JobContext context)
@@ -161,5 +167,4 @@ public abstract class TextVertexOutputFormat<I extends WritableComparable,
protected abstract Text convertVertexToLine(Vertex<I, V, E> vertex)
throws IOException;
}
-
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
new file mode 100644
index 0000000..2222255
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
+import org.apache.giraph.job.HadoopUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * For internal use only.
+ *
+ * Wraps user set {@link EdgeOutputFormat} to make sure proper configuration
+ * parameters are passed around, that user can set parameters in
+ * configuration and they will be available in other methods related to this
+ * format.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public class WrappedEdgeOutputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends EdgeOutputFormat<I, V, E> {
+
+ /** {@link EdgeOutputFormat} which is wrapped */
+ private final EdgeOutputFormat<I, V, E> originalOutputFormat;
+
+ /**
+ * Constructor
+ *
+ * @param edgeOutputFormat Edge output format to wrap
+ */
+ public WrappedEdgeOutputFormat(
+ EdgeOutputFormat<I, V, E> edgeOutputFormat) {
+ originalOutputFormat = edgeOutputFormat;
+ }
+
+ @Override
+ public EdgeWriter<I, V, E> createEdgeWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ final EdgeWriter<I, V, E> edgeWriter =
+ originalOutputFormat.createEdgeWriter(
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+ return new EdgeWriter<I, V, E>() {
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ super.setConf(conf);
+ edgeWriter.setConf(conf);
+ }
+
+ @Override
+ public void initialize(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ edgeWriter.initialize(
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+ }
+
+ @Override
+ public void close(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ edgeWriter.close(
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+ }
+
+ @Override
+ public void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
+ throws IOException, InterruptedException {
+ edgeWriter.writeEdge(sourceId, sourceValue, edge);
+ }
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ originalOutputFormat.checkOutputSpecs(
+ HadoopUtils.makeJobContext(getConf(), context));
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ final OutputCommitter outputCommitter =
+ originalOutputFormat.getOutputCommitter(
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+
+ return new OutputCommitter() {
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ outputCommitter.setupJob(
+ HadoopUtils.makeJobContext(getConf(), context));
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ outputCommitter.setupTask(
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+ }
+
+ @Override
+ public boolean needsTaskCommit(
+ TaskAttemptContext context) throws IOException {
+ return outputCommitter.needsTaskCommit(
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ outputCommitter.commitTask(
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ outputCommitter.abortTask(
+ HadoopUtils.makeTaskAttemptContext(getConf(), context));
+ }
+
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ outputCommitter.cleanupJob(
+ HadoopUtils.makeJobContext(getConf(), context));
+ }
+
+ /*if_not[HADOOP_NON_COMMIT_JOB]*/
+ @Override
+ public void commitJob(JobContext context) throws IOException {
+ outputCommitter.commitJob(
+ HadoopUtils.makeJobContext(getConf(), context));
+ }
+
+ @Override
+ public void abortJob(JobContext context,
+ JobStatus.State state) throws IOException {
+ outputCommitter.abortJob(
+ HadoopUtils.makeJobContext(getConf(), context), state);
+ }
+ /*end[HADOOP_NON_COMMIT_JOB]*/
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 745764b..4bc4f4d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -36,6 +36,7 @@ import org.apache.giraph.factories.VertexValueFactory;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.Language;
import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
@@ -97,10 +98,16 @@ public final class ConfigurationUtils {
OPTIONS.addOption("w", "workers", true, "Number of workers");
OPTIONS.addOption("vif", "vertexInputFormat", true, "Vertex input format");
OPTIONS.addOption("eif", "edgeInputFormat", true, "Edge input format");
- OPTIONS.addOption("of", "outputFormat", true, "Vertex output format");
+ OPTIONS.addOption("vof", "vertexOutputFormat", true,
+ "Vertex output format");
+ OPTIONS.addOption("eof", "edgeOutputFormat", true, "Edge output format");
OPTIONS.addOption("vip", "vertexInputPath", true, "Vertex input path");
OPTIONS.addOption("eip", "edgeInputPath", true, "Edge input path");
- OPTIONS.addOption("op", "outputPath", true, "Vertex output path");
+ OPTIONS.addOption("op", "outputPath", true, "Output path");
+ OPTIONS.addOption("vsd", "vertexSubDir", true, "subdirectory to be used " +
+ "for the vertex output");
+ OPTIONS.addOption("esd", "edgeSubDir", true, "subdirectory to be used " +
+ "for the edge output");
OPTIONS.addOption("c", "combiner", true, "Combiner class");
OPTIONS.addOption("ve", "outEdges", true, "Vertex edges class");
OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
@@ -316,14 +323,46 @@ public final class ConfigurationUtils {
"InputFormat does not require one.");
}
}
- if (cmd.hasOption("of")) {
+ if (cmd.hasOption("vof")) {
conf.setVertexOutputFormatClass(
(Class<? extends VertexOutputFormat>) Class
- .forName(cmd.getOptionValue("of")));
+ .forName(cmd.getOptionValue("vof")));
} else {
if (LOG.isInfoEnabled()) {
- LOG.info("No output format specified. Ensure your OutputFormat " +
- "does not require one.");
+ LOG.info("No vertex output format specified. Ensure your " +
+ "OutputFormat does not require one.");
+ }
+ }
+ if (cmd.hasOption("vof")) {
+ if (cmd.hasOption("vsd")) {
+ conf.setVertexOutputFormatSubdir(cmd.getOptionValue("vsd"));
+ }
+ }
+ if (cmd.hasOption("eof")) {
+ conf.setEdgeOutputFormatClass(
+ (Class<? extends EdgeOutputFormat>) Class
+ .forName(cmd.getOptionValue("eof")));
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("No edge output format specified. Ensure your " +
+ "OutputFormat does not require one.");
+ }
+ }
+ if (cmd.hasOption("eof")) {
+ if (cmd.hasOption("esd")) {
+ conf.setEdgeOutputFormatSubdir(cmd.getOptionValue("esd"));
+ }
+ }
+ /* check for path clashes */
+ if (cmd.hasOption("vof") && cmd.hasOption("eof") && cmd.hasOption("op")) {
+ if (!cmd.hasOption("vsd") || cmd.hasOption("esd")) {
+ if (!conf.hasEdgeOutputFormatSubdir() ||
+ !conf.hasVertexOutputFormatSubdir()) {
+
+ throw new IllegalArgumentException("If VertexOutputFormat and " +
+ "EdgeOutputFormat are both set, it is mandatory to provide " +
+ "both vertex subdirectory as well as edge subdirectory");
+ }
}
}
if (cmd.hasOption("pc")) {
@@ -385,7 +424,7 @@ public final class ConfigurationUtils {
Integer.parseInt(cmd.getOptionValue("yh")));
}
/*if[PURE_YARN]
- if (cmd.hasOption("of")) {
+ if (cmd.hasOption("vof") || cmd.hasOption("eof")) {
if (cmd.hasOption("op")) {
// For YARN conf to get the out dir we need w/o a Job obj
Path outputDir =
http://git-wip-us.apache.org/repos/asf/giraph/blob/ae01f039/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 9311fbd..112b76d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -32,6 +32,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerServer;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GlobalStats;
@@ -40,6 +41,8 @@ import org.apache.giraph.graph.InputSplitEvents;
import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.VertexWriter;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
@@ -919,13 +922,15 @@ public class BspServiceWorker<I extends WritableComparable,
*/
private void saveVertices(long numLocalVertices) throws IOException,
InterruptedException {
- if (getConfiguration().getVertexOutputFormatClass() == null) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
+
+ if (conf.getVertexOutputFormatClass() == null) {
LOG.warn("saveVertices: " +
GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
" not specified -- there will be no saved output");
return;
}
- if (getConfiguration().doOutputDuringComputation()) {
+ if (conf.doOutputDuringComputation()) {
if (LOG.isInfoEnabled()) {
LOG.info("saveVertices: The option for doing output during " +
"computation is selected, so there will be no saving of the " +
@@ -1024,12 +1029,126 @@ public class BspServiceWorker<I extends WritableComparable,
}
}
+ /**
+ * Save the edges using the user-defined EdgeOutputFormat from our
+ * vertexArray based on the split.
+ *
+ * @throws InterruptedException
+ */
+ private void saveEdges() throws IOException, InterruptedException {
+ final ImmutableClassesGiraphConfiguration<I, V, E> conf =
+ getConfiguration();
+
+ if (conf.getEdgeOutputFormatClass() == null) {
+ LOG.warn("saveEdges: " +
+ GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS +
+ "Make sure that the EdgeOutputFormat is not required.");
+ return;
+ }
+
+ final int numPartitions = getPartitionStore().getNumPartitions();
+ int numThreads = Math.min(conf.getNumOutputThreads(),
+ numPartitions);
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "saveEdges: Starting to save the edges using " +
+ numThreads + " threads");
+ final EdgeOutputFormat<I, V, E> edgeOutputFormat =
+ conf.createWrappedEdgeOutputFormat();
+
+ final Queue<Integer> partitionIdQueue =
+ (numPartitions == 0) ? new LinkedList<Integer>() :
+ new ArrayBlockingQueue<Integer>(numPartitions);
+ Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ EdgeWriter<I, V, E> edgeWriter =
+ edgeOutputFormat.createEdgeWriter(getContext());
+ edgeWriter.setConf(conf);
+ edgeWriter.initialize(getContext());
+
+ long nextPrintVertices = 0;
+ long nextPrintMsecs = System.currentTimeMillis() + 15000;
+ int partitionIndex = 0;
+ int numPartitions = getPartitionStore().getNumPartitions();
+ while (!partitionIdQueue.isEmpty()) {
+ Integer partitionId = partitionIdQueue.poll();
+ if (partitionId == null) {
+ break;
+ }
+
+ Partition<I, V, E> partition =
+ getPartitionStore().getPartition(partitionId);
+ long vertices = 0;
+ long edges = 0;
+ long partitionEdgeCount = partition.getEdgeCount();
+ for (Vertex<I, V, E> vertex : partition) {
+ for (Edge<I, E> edge : vertex.getEdges()) {
+ edgeWriter.writeEdge(vertex.getId(), vertex.getValue(), edge);
+ ++edges;
+ }
+ ++vertices;
+
+ // Update status at most every 250k vertices or 15 seconds
+ if (vertices > nextPrintVertices &&
+ System.currentTimeMillis() > nextPrintMsecs) {
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "saveEdges: Saved " + edges +
+ " edges out of " + partitionEdgeCount +
+ " partition edges, on partition " + partitionIndex +
+ " out of " + numPartitions);
+ nextPrintMsecs = System.currentTimeMillis() + 15000;
+ nextPrintVertices = vertices + 250000;
+ }
+ }
+ getPartitionStore().putPartition(partition);
+ ++partitionIndex;
+ }
+ edgeWriter.close(getContext()); // the temp results are saved now
+ return null;
+ }
+ };
+ }
+ };
+ ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+ "save-vertices-%d", getContext());
+
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "saveEdges: Done saving edges.");
+ // YARN: must complete the commit the "task" output, Hadoop isn't there.
+ if (conf.isPureYarnJob() &&
+ conf.getVertexOutputFormatClass() != null) {
+ try {
+ OutputCommitter outputCommitter =
+ edgeOutputFormat.getOutputCommitter(getContext());
+ if (outputCommitter.needsTaskCommit(getContext())) {
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "OutputCommitter: committing task output.");
+ // transfer from temp dirs to "task commit" dirs to prep for
+ // the master's OutputCommitter#commitJob(context) call to finish.
+ outputCommitter.commitTask(getContext());
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while attempting to obtain " +
+ "OutputCommitter.", ie);
+ } catch (IOException ioe) {
+ LOG.error("Master task's attempt to commit output has " +
+ "FAILED.", ioe);
+ }
+ }
+ }
+
@Override
public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
throws IOException, InterruptedException {
workerClient.closeConnections();
setCachedSuperstep(getSuperstep() - 1);
saveVertices(finishedSuperstepStats.getLocalVertexCount());
+ saveEdges();
getPartitionStore().shutdown();
// All worker processes should denote they are done by adding special
// znode. Once the number of znodes equals the number of partitions
@@ -1331,7 +1450,6 @@ else[HADOOP_NON_SECURE]*/
}
}
-
try {
workerClientRequestProcessor.flush();
workerClient.waitAllRequests();