You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC
[18/23] GIRAPH-409: Refactor / cleanups (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
deleted file mode 100644
index 6e97e6c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
+++ /dev/null
@@ -1,1392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-
-import org.apache.giraph.bsp.ApplicationState;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.WorkerClient;
-import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.WorkerServer;
-import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
-import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
-import org.apache.giraph.comm.netty.NettyWorkerClient;
-import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.comm.netty.NettyWorkerServer;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.graph.partition.Partition;
-import org.apache.giraph.graph.partition.PartitionExchange;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.graph.partition.PartitionStats;
-import org.apache.giraph.graph.partition.PartitionStore;
-import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.GiraphTimer;
-import org.apache.giraph.metrics.GiraphTimerContext;
-import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
-import org.apache.giraph.metrics.SuperstepMetricsRegistry;
-import org.apache.giraph.metrics.WorkerSuperstepMetrics;
-import org.apache.giraph.utils.LoggerUtils;
-import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.utils.ProgressableUtils;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import net.iharder.Base64;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class BspServiceWorker<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends BspService<I, V, E, M>
- implements CentralizedServiceWorker<I, V, E, M>,
- ResetSuperstepMetricsObserver {
- /** Name of gauge for time spent waiting on other workers */
- public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
- /** My process health znode */
- private String myHealthZnode;
- /** Worker info */
- private final WorkerInfo workerInfo;
- /** Worker graph partitioner */
- private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
-
- /** IPC Client */
- private final WorkerClient<I, V, E, M> workerClient;
- /** IPC Server */
- private final WorkerServer<I, V, E, M> workerServer;
- /** Request processor for aggregator requests */
- private final WorkerAggregatorRequestProcessor
- workerAggregatorRequestProcessor;
- /** Master info */
- private MasterInfo masterInfo = new MasterInfo();
- /** List of workers */
- private List<WorkerInfo> workerInfoList = Lists.newArrayList();
- /** Have the partition exchange children (workers) changed? */
- private final BspEvent partitionExchangeChildrenChanged;
-
- /** Worker Context */
- private final WorkerContext workerContext;
-
- /** Handler for aggregators */
- private final WorkerAggregatorHandler aggregatorHandler;
-
- // Per-Superstep Metrics
- /** Timer for WorkerContext#postSuperstep */
- private GiraphTimer wcPostSuperstepTimer;
- /** Time spent waiting on requests to finish */
- private GiraphTimer waitRequestsTimer;
-
- /**
- * Constructor for setting up the worker.
- *
- * @param serverPortList ZooKeeper server port list
- * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
- * @param context Mapper context
- * @param graphMapper Graph mapper
- * @throws IOException
- * @throws InterruptedException
- */
- public BspServiceWorker(
- String serverPortList,
- int sessionMsecTimeout,
- Mapper<?, ?, ?, ?>.Context context,
- GraphMapper<I, V, E, M> graphMapper)
- throws IOException, InterruptedException {
- super(serverPortList, sessionMsecTimeout, context, graphMapper);
- partitionExchangeChildrenChanged = new PredicateLock(context);
- registerBspEvent(partitionExchangeChildrenChanged);
- workerGraphPartitioner =
- getGraphPartitionerFactory().createWorkerGraphPartitioner();
- workerInfo = new WorkerInfo();
- workerServer =
- new NettyWorkerServer<I, V, E, M>(getConfiguration(), this, context);
- workerInfo.setInetSocketAddress(workerServer.getMyAddress());
- workerInfo.setTaskId(getTaskPartition());
- workerClient =
- new NettyWorkerClient<I, V, E, M>(context, getConfiguration(), this);
-
- workerAggregatorRequestProcessor =
- new NettyWorkerAggregatorRequestProcessor(getContext(),
- getConfiguration(), this);
-
- this.workerContext = getConfiguration().createWorkerContext(null);
-
- aggregatorHandler =
- new WorkerAggregatorHandler(this, getConfiguration(), context);
-
- GiraphMetrics.get().addSuperstepResetObserver(this);
- }
-
- @Override
- public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
- waitRequestsTimer = new GiraphTimer(superstepMetrics,
- TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
- wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
- "worker-context-post-superstep", TimeUnit.MICROSECONDS);
- }
-
- @Override
- public WorkerContext getWorkerContext() {
- return workerContext;
- }
-
- @Override
- public WorkerClient<I, V, E, M> getWorkerClient() {
- return workerClient;
- }
-
- /**
- * Intended to check the health of the node. For instance, can it ssh,
- * dmesg, etc. For now, does nothing.
- * TODO: Make this check configurable by the user (i.e. search dmesg for
- * problems).
- *
- * @return True if healthy (always in this case).
- */
- public boolean isHealthy() {
- return true;
- }
-
- /**
- * Load the vertices/edges from input slits. Do this until all the
- * InputSplits have been processed.
- * All workers will try to do as many InputSplits as they can. The master
- * will monitor progress and stop this once all the InputSplits have been
- * loaded and check-pointed. Keep track of the last input split path to
- * ensure the input split cache is flushed prior to marking the last input
- * split complete.
- *
- * Use one or more threads to do the loading.
- *
- * @param inputSplitPathList List of input split paths
- * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
- * @return Statistics of the vertices and edges loaded
- * @throws InterruptedException
- * @throws KeeperException
- */
- private VertexEdgeCount loadInputSplits(
- List<String> inputSplitPathList,
- InputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory)
- throws KeeperException, InterruptedException {
- VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
- // Determine how many threads to use based on the number of input splits
- int maxInputSplitThreads =
- Math.max(
- inputSplitPathList.size() / getConfiguration().getMaxWorkers(), 1);
- int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
- maxInputSplitThreads);
- ExecutorService inputSplitsExecutor =
- Executors.newFixedThreadPool(numThreads,
- new ThreadFactoryBuilder().setNameFormat("load-%d").build());
- List<Future<VertexEdgeCount>> threadsFutures =
- Lists.newArrayListWithCapacity(numThreads);
- if (LOG.isInfoEnabled()) {
- LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
- "originally " + getConfiguration().getNumInputSplitsThreads() +
- " threads(s) for " + inputSplitPathList.size() + " total splits.");
- }
- for (int i = 0; i < numThreads; ++i) {
- Callable<VertexEdgeCount> inputSplitsCallable =
- inputSplitsCallableFactory.newCallable();
- threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
- }
-
- // Wait until all the threads are done to wait on all requests
- for (Future<VertexEdgeCount> threadFuture : threadsFutures) {
- VertexEdgeCount threadVertexEdgeCount =
- ProgressableUtils.getFutureResult(threadFuture, getContext());
- vertexEdgeCount =
- vertexEdgeCount.incrVertexEdgeCount(threadVertexEdgeCount);
- }
-
- workerClient.waitAllRequests();
- inputSplitsExecutor.shutdown();
- return vertexEdgeCount;
- }
-
-
- /**
- * Load the vertices from the user-defined {@link VertexReader}
- *
- * @return Count of vertices and edges loaded
- */
- private VertexEdgeCount loadVertices() throws KeeperException,
- InterruptedException {
- List<String> inputSplitPathList =
- getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
- false, false, true);
-
- GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
- INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
- null, null);
-
- VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
- new VertexInputSplitsCallableFactory<I, V, E, M>(
- getContext(),
- graphState,
- getConfiguration(),
- this,
- inputSplitPathList,
- getWorkerInfo(),
- getZkExt());
-
- return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
- }
-
- /**
- * Load the edges from the user-defined {@link EdgeReader}.
- *
- * @return Number of edges loaded
- */
- private long loadEdges() throws KeeperException, InterruptedException {
- List<String> inputSplitPathList =
- getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
- false, false, true);
-
- GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
- INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
- null, null);
-
- EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
- new EdgeInputSplitsCallableFactory<I, V, E, M>(
- getContext(),
- graphState,
- getConfiguration(),
- this,
- inputSplitPathList,
- getWorkerInfo(),
- getZkExt());
-
- return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
- getEdgeCount();
- }
-
- @Override
- public MasterInfo getMasterInfo() {
- return masterInfo;
- }
-
- @Override
- public List<WorkerInfo> getWorkerInfoList() {
- return workerInfoList;
- }
-
- /**
- * Ensure the input splits are ready for processing
- *
- * @param inputSplitPaths Input split paths
- * @param inputSplitEvents Input split events
- */
- private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents) {
- while (true) {
- Stat inputSplitsReadyStat;
- try {
- inputSplitsReadyStat = getZkExt().exists(
- inputSplitPaths.getAllReadyPath(), true);
- } catch (KeeperException e) {
- throw new IllegalStateException("ensureInputSplitsReady: " +
- "KeeperException waiting on input splits", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("ensureInputSplitsReady: " +
- "InterruptedException waiting on input splits", e);
- }
- if (inputSplitsReadyStat != null) {
- break;
- }
- inputSplitEvents.getAllReadyChanged().waitForever();
- inputSplitEvents.getAllReadyChanged().reset();
- }
- }
-
- /**
- * Wait for all workers to finish processing input splits.
- *
- * @param inputSplitPaths Input split paths
- * @param inputSplitEvents Input split events
- */
- private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents) {
- String workerInputSplitsDonePath =
- inputSplitPaths.getDonePath() + "/" +
- getWorkerInfo().getHostnameId();
- try {
- getZkExt().createExt(workerInputSplitsDonePath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException("waitForOtherWorkers: " +
- "KeeperException creating worker done splits", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("waitForOtherWorkers: " +
- "InterruptedException creating worker done splits", e);
- }
- while (true) {
- Stat inputSplitsDoneStat;
- try {
- inputSplitsDoneStat =
- getZkExt().exists(inputSplitPaths.getAllDonePath(),
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException("waitForOtherWorkers: " +
- "KeeperException waiting on worker done splits", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("waitForOtherWorkers: " +
- "InterruptedException waiting on worker done splits", e);
- }
- if (inputSplitsDoneStat != null) {
- break;
- }
- inputSplitEvents.getAllDoneChanged().waitForever();
- inputSplitEvents.getAllDoneChanged().reset();
- }
- }
-
- @Override
- public FinishedSuperstepStats setup() {
- // Unless doing a restart, prepare for computation:
- // 1. Start superstep INPUT_SUPERSTEP (no computation)
- // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
- // 3. Process input splits until there are no more.
- // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
- // 5. Process any mutations deriving from add edge requests
- // 6. Wait for superstep INPUT_SUPERSTEP to complete.
- if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
- setCachedSuperstep(getRestartedSuperstep());
- return new FinishedSuperstepStats(false, -1, -1);
- }
-
- JSONObject jobState = getJobState();
- if (jobState != null) {
- try {
- if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
- ApplicationState.START_SUPERSTEP) &&
- jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
- getSuperstep()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: Restarting from an automated " +
- "checkpointed superstep " +
- getSuperstep() + ", attempt " +
- getApplicationAttempt());
- }
- setRestartedSuperstep(getSuperstep());
- return new FinishedSuperstepStats(false, -1, -1);
- }
- } catch (JSONException e) {
- throw new RuntimeException(
- "setup: Failed to get key-values from " +
- jobState.toString(), e);
- }
- }
-
- // Add the partitions that this worker owns
- GraphState<I, V, E, M> graphState =
- new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
- getContext(), getGraphMapper(), null, null);
- Collection<? extends PartitionOwner> masterSetPartitionOwners =
- startSuperstep(graphState);
- workerGraphPartitioner.updatePartitionOwners(
- getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
-
-/*if[HADOOP_NON_SECURE]
- workerClient.setup();
-else[HADOOP_NON_SECURE]*/
- workerClient.setup(getConfiguration().authenticate());
-/*end[HADOOP_NON_SECURE]*/
-
- VertexEdgeCount vertexEdgeCount;
-
- if (getConfiguration().hasVertexInputFormat()) {
- // Ensure the vertex InputSplits are ready for processing
- ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
- getContext().progress();
- try {
- vertexEdgeCount = loadVertices();
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "setup: loadVertices failed with InterruptedException", e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "setup: loadVertices failed with KeeperException", e);
- }
- getContext().progress();
- } else {
- vertexEdgeCount = new VertexEdgeCount();
- }
-
- if (getConfiguration().hasEdgeInputFormat()) {
- // Ensure the edge InputSplits are ready for processing
- ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
- getContext().progress();
- try {
- vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "setup: loadEdges failed with InterruptedException", e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "setup: loadEdges failed with KeeperException", e);
- }
- getContext().progress();
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
- }
-
- if (getConfiguration().hasVertexInputFormat()) {
- // Workers wait for each other to finish, coordinated by master
- waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
- }
-
- if (getConfiguration().hasEdgeInputFormat()) {
- // Workers wait for each other to finish, coordinated by master
- waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
- }
-
- // Create remaining partitions owned by this worker.
- for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
- if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
- !getPartitionStore().hasPartition(
- partitionOwner.getPartitionId())) {
- Partition<I, V, E, M> partition =
- getConfiguration().createPartition(
- partitionOwner.getPartitionId(), getContext());
- getPartitionStore().addPartition(partition);
- }
- }
-
- if (getConfiguration().hasEdgeInputFormat()) {
- // Create vertices from added edges via vertex resolver.
- // Doing this at the beginning of superstep 0 is not enough,
- // because we want the vertex/edge stats to be accurate.
- workerServer.resolveMutations(graphState);
- }
-
- // Generate the partition stats for the input superstep and process
- // if necessary
- List<PartitionStats> partitionStatsList =
- new ArrayList<PartitionStats>();
- for (Partition<I, V, E, M> partition :
- getPartitionStore().getPartitions()) {
- PartitionStats partitionStats =
- new PartitionStats(partition.getId(),
- partition.getVertexCount(),
- 0,
- partition.getEdgeCount(),
- 0);
- partitionStatsList.add(partitionStats);
- }
- workerGraphPartitioner.finalizePartitionStats(
- partitionStatsList, getPartitionStore());
-
- return finishSuperstep(graphState, partitionStatsList);
- }
-
- /**
- * Register the health of this worker for a given superstep
- *
- * @param superstep Superstep to register health on
- */
- private void registerHealth(long superstep) {
- JSONArray hostnamePort = new JSONArray();
- hostnamePort.put(getHostname());
-
- hostnamePort.put(workerInfo.getPort());
-
- String myHealthPath = null;
- if (isHealthy()) {
- myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
- getSuperstep());
- } else {
- myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
- getSuperstep());
- }
- myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
- try {
- myHealthZnode = getZkExt().createExt(
- myHealthPath,
- WritableUtils.writeToByteArray(workerInfo),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL,
- true);
- } catch (KeeperException.NodeExistsException e) {
- LOG.warn("registerHealth: myHealthPath already exists (likely " +
- "from previous failure): " + myHealthPath +
- ". Waiting for change in attempts " +
- "to re-join the application");
- getApplicationAttemptChangedEvent().waitForever();
- if (LOG.isInfoEnabled()) {
- LOG.info("registerHealth: Got application " +
- "attempt changed event, killing self");
- }
- throw new IllegalStateException(
- "registerHealth: Trying " +
- "to get the new application attempt by killing self", e);
- } catch (KeeperException e) {
- throw new IllegalStateException("Creating " + myHealthPath +
- " failed with KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("Creating " + myHealthPath +
- " failed with InterruptedException", e);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("registerHealth: Created my health node for attempt=" +
- getApplicationAttempt() + ", superstep=" +
- getSuperstep() + " with " + myHealthZnode +
- " and workerInfo= " + workerInfo);
- }
- }
-
- /**
- * Do this to help notify the master quicker that this worker has failed.
- */
- private void unregisterHealth() {
- LOG.error("unregisterHealth: Got failure, unregistering health on " +
- myHealthZnode + " on superstep " + getSuperstep());
- try {
- getZkExt().deleteExt(myHealthZnode, -1, false);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "unregisterHealth: InterruptedException - Couldn't delete " +
- myHealthZnode, e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "unregisterHealth: KeeperException - Couldn't delete " +
- myHealthZnode, e);
- }
- }
-
- @Override
- public void failureCleanup() {
- unregisterHealth();
- }
-
- @Override
- public Collection<? extends PartitionOwner> startSuperstep(
- GraphState<I, V, E, M> graphState) {
- // Algorithm:
- // 1. Communication service will combine message from previous
- // superstep
- // 2. Register my health for the next superstep.
- // 3. Wait until the partition assignment is complete and get it
- // 4. Get the aggregator values from the previous superstep
- if (getSuperstep() != INPUT_SUPERSTEP) {
- workerServer.prepareSuperstep(graphState);
- }
-
- registerHealth(getSuperstep());
-
- String addressesAndPartitionsPath =
- getAddressesAndPartitionsPath(getApplicationAttempt(),
- getSuperstep());
- AddressesAndPartitionsWritable addressesAndPartitions =
- new AddressesAndPartitionsWritable(
- workerGraphPartitioner.createPartitionOwner().getClass());
- try {
- while (getZkExt().exists(addressesAndPartitionsPath, true) ==
- null) {
- getAddressesAndPartitionsReadyChangedEvent().waitForever();
- getAddressesAndPartitionsReadyChangedEvent().reset();
- }
- WritableUtils.readFieldsFromZnode(
- getZkExt(),
- addressesAndPartitionsPath,
- false,
- null,
- addressesAndPartitions);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "startSuperstep: KeeperException getting assignments", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "startSuperstep: InterruptedException getting assignments", e);
- }
-
- workerInfoList.clear();
- workerInfoList = addressesAndPartitions.getWorkerInfos();
- masterInfo = addressesAndPartitions.getMasterInfo();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("startSuperstep: " + masterInfo);
- LOG.info("startSuperstep: Ready for computation on superstep " +
- getSuperstep() + " since worker " +
- "selection and vertex range assignments are done in " +
- addressesAndPartitionsPath);
- }
-
- getContext().setStatus("startSuperstep: " +
- getGraphMapper().getMapFunctions().toString() +
- " - Attempt=" + getApplicationAttempt() +
- ", Superstep=" + getSuperstep());
- return addressesAndPartitions.getPartitionOwners();
- }
-
- @Override
- public FinishedSuperstepStats finishSuperstep(
- GraphState<I, V, E, M> graphState,
- List<PartitionStats> partitionStatsList) {
- // This barrier blocks until success (or the master signals it to
- // restart).
- //
- // Master will coordinate the barriers and aggregate "doneness" of all
- // the vertices. Each worker will:
- // 1. Ensure that the requests are complete
- // 2. Execute user postSuperstep() if necessary.
- // 3. Save aggregator values that are in use.
- // 4. Report the statistics (vertices, edges, messages, etc.)
- // of this worker
- // 5. Let the master know it is finished.
- // 6. Wait for the master's global stats, and check if done
- waitForRequestsToFinish();
-
- graphState.getGraphMapper().notifyFinishedCommunication();
-
- long workerSentMessages = 0;
- for (PartitionStats partitionStats : partitionStatsList) {
- workerSentMessages += partitionStats.getMessagesSentCount();
- }
-
- if (getSuperstep() != INPUT_SUPERSTEP) {
- getWorkerContext().setGraphState(graphState);
- GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
- getWorkerContext().postSuperstep();
- timerContext.stop();
- getContext().progress();
- }
-
- aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("finishSuperstep: Superstep " + getSuperstep() +
- ", messages = " + workerSentMessages + " " +
- MemoryUtils.getRuntimeMemoryStats());
- }
-
- writeFinshedSuperstepInfoToZK(partitionStatsList, workerSentMessages);
-
- LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
- "finishSuperstep: (waiting for rest " +
- "of workers) " +
- getGraphMapper().getMapFunctions().toString() +
- " - Attempt=" + getApplicationAttempt() +
- ", Superstep=" + getSuperstep());
-
- String superstepFinishedNode =
- getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
-
- waitForOtherWorkers(superstepFinishedNode);
-
- GlobalStats globalStats = new GlobalStats();
- WritableUtils.readFieldsFromZnode(
- getZkExt(), superstepFinishedNode, false, null, globalStats);
- if (LOG.isInfoEnabled()) {
- LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
- " with global stats " + globalStats);
- }
- incrCachedSuperstep();
- getContext().setStatus("finishSuperstep: (all workers done) " +
- getGraphMapper().getMapFunctions().toString() +
- " - Attempt=" + getApplicationAttempt() +
- ", Superstep=" + getSuperstep());
-
- return new FinishedSuperstepStats(
- globalStats.getHaltComputation(),
- globalStats.getVertexCount(),
- globalStats.getEdgeCount());
- }
-
- /**
- * Wait for all the requests to finish.
- */
- private void waitForRequestsToFinish() {
- if (LOG.isInfoEnabled()) {
- LOG.info("finishSuperstep: Waiting on all requests, superstep " +
- getSuperstep() + " " +
- MemoryUtils.getRuntimeMemoryStats());
- }
- GiraphTimerContext timerContext = waitRequestsTimer.time();
- workerClient.waitAllRequests();
- timerContext.stop();
- }
-
- /**
- * Wait for all the other Workers to finish the superstep.
- *
- * @param superstepFinishedNode ZooKeeper path to wait on.
- */
- private void waitForOtherWorkers(String superstepFinishedNode) {
- try {
- while (getZkExt().exists(superstepFinishedNode, true) == null) {
- getSuperstepFinishedEvent().waitForever();
- getSuperstepFinishedEvent().reset();
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "finishSuperstep: Failed while waiting for master to " +
- "signal completion of superstep " + getSuperstep(), e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "finishSuperstep: Failed while waiting for master to " +
- "signal completion of superstep " + getSuperstep(), e);
- }
- }
-
- /**
- * Write finished superstep info to ZooKeeper.
- *
- * @param partitionStatsList List of partition stats from superstep.
- * @param workerSentMessages Number of messages sent in superstep.
- */
- private void writeFinshedSuperstepInfoToZK(
- List<PartitionStats> partitionStatsList, long workerSentMessages) {
- Collection<PartitionStats> finalizedPartitionStats =
- workerGraphPartitioner.finalizePartitionStats(
- partitionStatsList, getPartitionStore());
- List<PartitionStats> finalizedPartitionStatsList =
- new ArrayList<PartitionStats>(finalizedPartitionStats);
- byte[] partitionStatsBytes =
- WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
- WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
- metrics.readFromRegistry();
- byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
-
- JSONObject workerFinishedInfoObj = new JSONObject();
- try {
- workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
- Base64.encodeBytes(partitionStatsBytes));
- workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
- workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
- Base64.encodeBytes(metricsBytes));
- } catch (JSONException e) {
- throw new RuntimeException(e);
- }
-
- String finishedWorkerPath =
- getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
- "/" + getHostnamePartitionId();
- try {
- getZkExt().createExt(finishedWorkerPath,
- workerFinishedInfoObj.toString().getBytes(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException.NodeExistsException e) {
- LOG.warn("finishSuperstep: finished worker path " +
- finishedWorkerPath + " already exists!");
- } catch (KeeperException e) {
- throw new IllegalStateException("Creating " + finishedWorkerPath +
- " failed with KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("Creating " + finishedWorkerPath +
- " failed with InterruptedException", e);
- }
- }
-
- /**
- * Save the vertices using the user-defined VertexOutputFormat from our
- * vertexArray based on the split.
- * @throws InterruptedException
- */
- private void saveVertices() throws IOException, InterruptedException {
- if (getConfiguration().getVertexOutputFormatClass() == null) {
- LOG.warn("saveVertices: " +
- GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
- " not specified -- there will be no saved output");
- return;
- }
-
- LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
- "saveVertices: Starting to save vertices");
- VertexOutputFormat<I, V, E> vertexOutputFormat =
- getConfiguration().createVertexOutputFormat();
- VertexWriter<I, V, E> vertexWriter =
- vertexOutputFormat.createVertexWriter(getContext());
- vertexWriter.initialize(getContext());
- for (Partition<I, V, E, M> partition :
- getPartitionStore().getPartitions()) {
- for (Vertex<I, V, E, M> vertex : partition) {
- getContext().progress();
- vertexWriter.writeVertex(vertex);
- }
- getContext().progress();
- }
- vertexWriter.close(getContext());
- LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
- "saveVertices: Done saving vertices");
- }
-
- @Override
- public void cleanup() throws IOException, InterruptedException {
- workerClient.closeConnections();
- setCachedSuperstep(getSuperstep() - 1);
- saveVertices();
- // All worker processes should denote they are done by adding special
- // znode. Once the number of znodes equals the number of partitions
- // for workers and masters, the master will clean up the ZooKeeper
- // znodes associated with this job.
- String workerCleanedUpPath = cleanedUpPath + "/" +
- getTaskPartition() + WORKER_SUFFIX;
- try {
- String finalFinishedPath =
- getZkExt().createExt(workerCleanedUpPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- if (LOG.isInfoEnabled()) {
- LOG.info("cleanup: Notifying master its okay to cleanup with " +
- finalFinishedPath);
- }
- } catch (KeeperException.NodeExistsException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("cleanup: Couldn't create finished node '" +
- workerCleanedUpPath);
- }
- } catch (KeeperException e) {
- // Cleaning up, it's okay to fail after cleanup is successful
- LOG.error("cleanup: Got KeeperException on notification " +
- "to master about cleanup", e);
- } catch (InterruptedException e) {
- // Cleaning up, it's okay to fail after cleanup is successful
- LOG.error("cleanup: Got InterruptedException on notification " +
- "to master about cleanup", e);
- }
- try {
- getZkExt().close();
- } catch (InterruptedException e) {
- // cleanup phase -- just log the error
- LOG.error("cleanup: Zookeeper failed to close with " + e);
- }
-
- if (getConfiguration().metricsEnabled()) {
- GiraphMetrics.get().dumpToStdout();
- }
-
- // Preferably would shut down the service only after
- // all clients have disconnected (or the exceptions on the
- // client side ignored).
- workerServer.close();
- }
-
- @Override
- public void storeCheckpoint() throws IOException {
- LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
- "storeCheckpoint: Starting checkpoint " +
- getGraphMapper().getMapFunctions().toString() +
- " - Attempt=" + getApplicationAttempt() +
- ", Superstep=" + getSuperstep());
-
- // Algorithm:
- // For each partition, dump vertices and messages
- Path metadataFilePath =
- new Path(getCheckpointBasePath(getSuperstep()) + "." +
- getHostnamePartitionId() +
- CHECKPOINT_METADATA_POSTFIX);
- Path verticesFilePath =
- new Path(getCheckpointBasePath(getSuperstep()) + "." +
- getHostnamePartitionId() +
- CHECKPOINT_VERTICES_POSTFIX);
- Path validFilePath =
- new Path(getCheckpointBasePath(getSuperstep()) + "." +
- getHostnamePartitionId() +
- CHECKPOINT_VALID_POSTFIX);
-
- // Remove these files if they already exist (shouldn't though, unless
- // of previous failure of this worker)
- if (getFs().delete(validFilePath, false)) {
- LOG.warn("storeCheckpoint: Removed valid file " +
- validFilePath);
- }
- if (getFs().delete(metadataFilePath, false)) {
- LOG.warn("storeCheckpoint: Removed metadata file " +
- metadataFilePath);
- }
- if (getFs().delete(verticesFilePath, false)) {
- LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
- }
-
- FSDataOutputStream verticesOutputStream =
- getFs().create(verticesFilePath);
- ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
- DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
- for (Partition<I, V, E, M> partition :
- getPartitionStore().getPartitions()) {
- long startPos = verticesOutputStream.getPos();
- partition.write(verticesOutputStream);
- // write messages
- getServerData().getCurrentMessageStore().writePartition(
- verticesOutputStream, partition.getId());
- // Write the metadata for this partition
- // Format:
- // <index count>
- // <index 0 start pos><partition id>
- // <index 1 start pos><partition id>
- metadataOutput.writeLong(startPos);
- metadataOutput.writeInt(partition.getId());
- if (LOG.isDebugEnabled()) {
- LOG.debug("storeCheckpoint: Vertex file starting " +
- "offset = " + startPos + ", length = " +
- (verticesOutputStream.getPos() - startPos) +
- ", partition = " + partition.toString());
- }
- getContext().progress();
- }
- // Metadata is buffered and written at the end since it's small and
- // needs to know how many partitions this worker owns
- FSDataOutputStream metadataOutputStream =
- getFs().create(metadataFilePath);
- metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
- metadataOutputStream.write(metadataByteStream.toByteArray());
- metadataOutputStream.close();
- verticesOutputStream.close();
- if (LOG.isInfoEnabled()) {
- LOG.info("storeCheckpoint: Finished metadata (" +
- metadataFilePath + ") and vertices (" + verticesFilePath + ").");
- }
-
- getFs().createNewFile(validFilePath);
-
- // Notify master that checkpoint is stored
- String workerWroteCheckpoint =
- getWorkerWroteCheckpointPath(getApplicationAttempt(),
- getSuperstep()) + "/" + getHostnamePartitionId();
- try {
- getZkExt().createExt(workerWroteCheckpoint,
- new byte[0],
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException.NodeExistsException e) {
- LOG.warn("finishSuperstep: wrote checkpoint worker path " +
- workerWroteCheckpoint + " already exists!");
- } catch (KeeperException e) {
- throw new IllegalStateException("Creating " + workerWroteCheckpoint +
- " failed with KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("Creating " +
- workerWroteCheckpoint +
- " failed with InterruptedException", e);
- }
- }
-
- @Override
- public VertexEdgeCount loadCheckpoint(long superstep) {
- try {
- // clear old message stores
- getServerData().getIncomingMessageStore().clearAll();
- getServerData().getCurrentMessageStore().clearAll();
- } catch (IOException e) {
- throw new RuntimeException(
- "loadCheckpoint: Failed to clear message stores ", e);
- }
-
- // Algorithm:
- // Examine all the partition owners and load the ones
- // that match my hostname and id from the master designated checkpoint
- // prefixes.
- long startPos = 0;
- int loadedPartitions = 0;
- for (PartitionOwner partitionOwner :
- workerGraphPartitioner.getPartitionOwners()) {
- if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
- String metadataFile =
- partitionOwner.getCheckpointFilesPrefix() +
- CHECKPOINT_METADATA_POSTFIX;
- String partitionsFile =
- partitionOwner.getCheckpointFilesPrefix() +
- CHECKPOINT_VERTICES_POSTFIX;
- try {
- int partitionId = -1;
- DataInputStream metadataStream =
- getFs().open(new Path(metadataFile));
- int partitions = metadataStream.readInt();
- for (int i = 0; i < partitions; ++i) {
- startPos = metadataStream.readLong();
- partitionId = metadataStream.readInt();
- if (partitionId == partitionOwner.getPartitionId()) {
- break;
- }
- }
- if (partitionId != partitionOwner.getPartitionId()) {
- throw new IllegalStateException(
- "loadCheckpoint: " + partitionOwner +
- " not found!");
- }
- metadataStream.close();
- Partition<I, V, E, M> partition =
- getConfiguration().createPartition(partitionId, getContext());
- DataInputStream partitionsStream =
- getFs().open(new Path(partitionsFile));
- if (partitionsStream.skip(startPos) != startPos) {
- throw new IllegalStateException(
- "loadCheckpoint: Failed to skip " + startPos +
- " on " + partitionsFile);
- }
- partition.readFields(partitionsStream);
- if (partitionsStream.readBoolean()) {
- getServerData().getCurrentMessageStore().readFieldsForPartition(
- partitionsStream, partitionId);
- }
- partitionsStream.close();
- if (LOG.isInfoEnabled()) {
- LOG.info("loadCheckpoint: Loaded partition " +
- partition);
- }
- if (getPartitionStore().hasPartition(partitionId)) {
- throw new IllegalStateException(
- "loadCheckpoint: Already has partition owner " +
- partitionOwner);
- }
- getPartitionStore().addPartition(partition);
- getContext().progress();
- ++loadedPartitions;
- } catch (IOException e) {
- throw new RuntimeException(
- "loadCheckpoint: Failed to get partition owner " +
- partitionOwner, e);
- }
- }
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
- " partitions of out " +
- workerGraphPartitioner.getPartitionOwners().size() +
- " total.");
- }
-
- // Load global statistics
- GlobalStats globalStats = null;
- String finalizedCheckpointPath =
- getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
- try {
- DataInputStream finalizedStream =
- getFs().open(new Path(finalizedCheckpointPath));
- globalStats = new GlobalStats();
- globalStats.readFields(finalizedStream);
- } catch (IOException e) {
- throw new IllegalStateException(
- "loadCheckpoint: Failed to load global statistics", e);
- }
-
- // Communication service needs to setup the connections prior to
- // processing vertices
-/*if[HADOOP_NON_SECURE]
- workerClient.setup();
-else[HADOOP_NON_SECURE]*/
- workerClient.setup(getConfiguration().authenticate());
-/*end[HADOOP_NON_SECURE]*/
- return new VertexEdgeCount(globalStats.getVertexCount(),
- globalStats.getEdgeCount());
- }
-
- /**
- * Send the worker partitions to their destination workers
- *
- * @param workerPartitionMap Map of worker info to the partitions stored
- * on this worker to be sent
- */
- private void sendWorkerPartitions(
- Map<WorkerInfo, List<Integer>> workerPartitionMap) {
- List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
- new ArrayList<Entry<WorkerInfo, List<Integer>>>(
- workerPartitionMap.entrySet());
- Collections.shuffle(randomEntryList);
- WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor =
- new NettyWorkerClientRequestProcessor<I, V, E, M>(getContext(),
- getConfiguration(), this);
- for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
- randomEntryList) {
- for (Integer partitionId : workerPartitionList.getValue()) {
- Partition<I, V, E, M> partition =
- getPartitionStore().removePartition(partitionId);
- if (partition == null) {
- throw new IllegalStateException(
- "sendWorkerPartitions: Couldn't find partition " +
- partitionId + " to send to " +
- workerPartitionList.getKey());
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("sendWorkerPartitions: Sending worker " +
- workerPartitionList.getKey() + " partition " +
- partitionId);
- }
- workerClientRequestProcessor.sendPartitionRequest(
- workerPartitionList.getKey(),
- partition);
- }
- }
-
-
- try {
- workerClientRequestProcessor.flush();
- workerClient.waitAllRequests();
- } catch (IOException e) {
- throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
- }
- String myPartitionExchangeDonePath =
- getPartitionExchangeWorkerPath(
- getApplicationAttempt(), getSuperstep(), getWorkerInfo());
- try {
- getZkExt().createExt(myPartitionExchangeDonePath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "sendWorkerPartitions: KeeperException to create " +
- myPartitionExchangeDonePath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "sendWorkerPartitions: InterruptedException to create " +
- myPartitionExchangeDonePath, e);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("sendWorkerPartitions: Done sending all my partitions.");
- }
- }
-
- @Override
- public final void exchangeVertexPartitions(
- Collection<? extends PartitionOwner> masterSetPartitionOwners) {
- // 1. Fix the addresses of the partition ids if they have changed.
- // 2. Send all the partitions to their destination workers in a random
- // fashion.
- // 3. Notify completion with a ZooKeeper stamp
- // 4. Wait for all my dependencies to be done (if any)
- // 5. Add the partitions to myself.
- PartitionExchange partitionExchange =
- workerGraphPartitioner.updatePartitionOwners(
- getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
- workerClient.openConnections();
-
- Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
- partitionExchange.getSendWorkerPartitionMap();
- if (!getPartitionStore().isEmpty()) {
- sendWorkerPartitions(sendWorkerPartitionMap);
- }
-
- Set<WorkerInfo> myDependencyWorkerSet =
- partitionExchange.getMyDependencyWorkerSet();
- Set<String> workerIdSet = new HashSet<String>();
- for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
- if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
- throw new IllegalStateException(
- "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
- }
- }
- if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
- "exiting early");
- }
- return;
- }
-
- String vertexExchangePath =
- getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
- List<String> workerDoneList;
- try {
- while (true) {
- workerDoneList = getZkExt().getChildrenExt(
- vertexExchangePath, true, false, false);
- workerIdSet.removeAll(workerDoneList);
- if (workerIdSet.isEmpty()) {
- break;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("exchangeVertexPartitions: Waiting for workers " +
- workerIdSet);
- }
- getPartitionExchangeChildrenChangedEvent().waitForever();
- getPartitionExchangeChildrenChangedEvent().reset();
- }
- } catch (KeeperException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("exchangeVertexPartitions: Done with exchange.");
- }
- }
-
- /**
- * Get event when the state of a partition exchange has changed.
- *
- * @return Event to check.
- */
- public final BspEvent getPartitionExchangeChildrenChangedEvent() {
- return partitionExchangeChildrenChanged;
- }
-
- @Override
- protected boolean processEvent(WatchedEvent event) {
- boolean foundEvent = false;
- if (event.getPath().startsWith(masterJobStatePath) &&
- (event.getType() == EventType.NodeChildrenChanged)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("processEvent: Job state changed, checking " +
- "to see if it needs to restart");
- }
- JSONObject jsonObj = getJobState();
- try {
- if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
- ApplicationState.START_SUPERSTEP) &&
- jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
- getApplicationAttempt()) {
- LOG.fatal("processEvent: Worker will restart " +
- "from command - " + jsonObj.toString());
- System.exit(-1);
- }
- } catch (JSONException e) {
- throw new RuntimeException(
- "processEvent: Couldn't properly get job state from " +
- jsonObj.toString());
- }
- foundEvent = true;
- } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
- event.getType() == EventType.NodeChildrenChanged) {
- if (LOG.isInfoEnabled()) {
- LOG.info("processEvent : partitionExchangeChildrenChanged " +
- "(at least one worker is done sending partitions)");
- }
- partitionExchangeChildrenChanged.signal();
- foundEvent = true;
- }
-
- return foundEvent;
- }
-
- @Override
- public WorkerInfo getWorkerInfo() {
- return workerInfo;
- }
-
- @Override
- public PartitionStore<I, V, E, M> getPartitionStore() {
- return getServerData().getPartitionStore();
- }
-
- @Override
- public PartitionOwner getVertexPartitionOwner(I vertexId) {
- return workerGraphPartitioner.getPartitionOwner(vertexId);
- }
-
- @Override
- public Iterable<? extends PartitionOwner> getPartitionOwners() {
- return workerGraphPartitioner.getPartitionOwners();
- }
-
- @Override
- public Partition<I, V, E, M> getPartition(I vertexId) {
- return getPartitionStore().getPartition(getPartitionId(vertexId));
- }
-
- @Override
- public Integer getPartitionId(I vertexId) {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
- return partitionOwner.getPartitionId();
- }
-
- @Override
- public boolean hasPartition(Integer partitionId) {
- return getPartitionStore().hasPartition(partitionId);
- }
-
- @Override
- public Vertex<I, V, E, M> getVertex(I vertexId) {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
- if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) {
- return getPartitionStore().getPartition(
- partitionOwner.getPartitionId()).getVertex(vertexId);
- } else {
- return null;
- }
- }
-
- @Override
- public ServerData<I, V, E, M> getServerData() {
- return workerServer.getServerData();
- }
-
- @Override
- public WorkerAggregatorHandler getAggregatorHandler() {
- return aggregatorHandler;
- }
-
- @Override
- public void prepareSuperstep() {
- if (getSuperstep() != INPUT_SUPERSTEP) {
- aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/BspUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspUtils.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspUtils.java
deleted file mode 100644
index 651290d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/BspUtils.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.graph.partition.GraphPartitionerFactory;
-import org.apache.giraph.graph.partition.HashPartitionerFactory;
-import org.apache.giraph.graph.partition.PartitionStats;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * Help to use the configuration to get the appropriate classes or
- * instantiate them.
- */
-public class BspUtils {
- /**
- * Do not construct.
- */
- private BspUtils() { }
-
- /**
- * Get the user's subclassed {@link GraphPartitionerFactory}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param conf Configuration to check
- * @return User's graph partitioner
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- Class<? extends GraphPartitionerFactory<I, V, E, M>>
- getGraphPartitionerClass(Configuration conf) {
- return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
- conf.getClass(GiraphConstants.GRAPH_PARTITIONER_FACTORY_CLASS,
- HashPartitionerFactory.class,
- GraphPartitionerFactory.class);
- }
-
- /**
- * Create a user graph partitioner class
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param conf Configuration to check
- * @return Instantiated user graph partitioner class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- GraphPartitionerFactory<I, V, E, M>
- createGraphPartitioner(Configuration conf) {
- Class<? extends GraphPartitionerFactory<I, V, E, M>>
- graphPartitionerFactoryClass = getGraphPartitionerClass(conf);
- return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf);
- }
-
- /**
- * Create a user graph partitioner partition stats class
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param conf Configuration to check
- * @return Instantiated user graph partition stats class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- PartitionStats createGraphPartitionStats(Configuration conf) {
- GraphPartitionerFactory<I, V, E, M> graphPartitioner =
- createGraphPartitioner(conf);
- return graphPartitioner.createMasterGraphPartitioner().
- createPartitionStats();
- }
-
- /**
- * Get the user's subclassed {@link VertexInputFormat}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param conf Configuration to check
- * @return User's vertex input format class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- Class<? extends VertexInputFormat<I, V, E, M>>
- getVertexInputFormatClass(Configuration conf) {
- return (Class<? extends VertexInputFormat<I, V, E, M>>)
- conf.getClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
- null,
- VertexInputFormat.class);
- }
-
- /**
- * Create a user vertex input format class
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param conf Configuration to check
- * @return Instantiated user vertex input format class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable> VertexInputFormat<I, V, E, M>
- createVertexInputFormat(Configuration conf) {
- Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
- getVertexInputFormatClass(conf);
- VertexInputFormat<I, V, E, M> inputFormat =
- ReflectionUtils.newInstance(vertexInputFormatClass, conf);
- return inputFormat;
- }
-
- /**
- * Get the user's subclassed {@link VertexOutputFormat}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param conf Configuration to check
- * @return User's vertex output format class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable>
- Class<? extends VertexOutputFormat<I, V, E>>
- getVertexOutputFormatClass(Configuration conf) {
- return (Class<? extends VertexOutputFormat<I, V, E>>)
- conf.getClass(GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS,
- null,
- VertexOutputFormat.class);
- }
-
- /**
- * Create a user vertex output format class
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param conf Configuration to check
- * @return Instantiated user vertex output format class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable> VertexOutputFormat<I, V, E>
- createVertexOutputFormat(Configuration conf) {
- Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass =
- getVertexOutputFormatClass(conf);
- return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
- }
-
- /**
- * Get the user's subclassed {@link EdgeInputFormat}.
- *
- * @param <I> Vertex id
- * @param <E> Edge data
- * @param conf Configuration to check
- * @return User's edge input format class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, E extends Writable>
- Class<? extends EdgeInputFormat<I, E>>
- getEdgeInputFormatClass(Configuration conf) {
- return (Class<? extends EdgeInputFormat<I, E>>)
- conf.getClass(GiraphConstants.EDGE_INPUT_FORMAT_CLASS,
- null,
- EdgeInputFormat.class);
- }
-
- /**
- * Create a user edge input format class
- *
- * @param <I> Vertex id
- * @param <E> Edge data
- * @param conf Configuration to check
- * @return Instantiated user edge input format class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, E extends Writable>
- EdgeInputFormat<I, E> createEdgeInputFormat(Configuration conf) {
- Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
- getEdgeInputFormatClass(conf);
- EdgeInputFormat<I, E> inputFormat =
- ReflectionUtils.newInstance(edgeInputFormatClass, conf);
- return inputFormat;
- }
-
- /**
- * Get the user's subclassed {@link AggregatorWriter}.
- *
- * @param conf Configuration to check
- * @return User's aggregator writer class
- */
- public static Class<? extends AggregatorWriter>
- getAggregatorWriterClass(Configuration conf) {
- return conf.getClass(GiraphConstants.AGGREGATOR_WRITER_CLASS,
- TextAggregatorWriter.class,
- AggregatorWriter.class);
- }
-
- /**
- * Create a user aggregator output format class
- *
- * @param conf Configuration to check
- * @return Instantiated user aggregator writer class
- */
- public static AggregatorWriter createAggregatorWriter(Configuration conf) {
- Class<? extends AggregatorWriter> aggregatorWriterClass =
- getAggregatorWriterClass(conf);
- return ReflectionUtils.newInstance(aggregatorWriterClass, conf);
- }
-
- /**
- * Get the user's subclassed {@link Combiner}.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- * @param conf Configuration to check
- * @return User's vertex combiner class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, M extends Writable>
- Class<? extends Combiner<I, M>> getCombinerClass(Configuration conf) {
- return (Class<? extends Combiner<I, M>>)
- conf.getClass(GiraphConstants.VERTEX_COMBINER_CLASS,
- null,
- Combiner.class);
- }
-
- /**
- * Get the user's subclassed VertexResolver.
- *
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param conf Configuration to check
- * @return User's vertex resolver class
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- Class<? extends VertexResolver<I, V, E, M>>
- getVertexResolverClass(Configuration conf) {
- return (Class<? extends VertexResolver<I, V, E, M>>)
- conf.getClass(GiraphConstants.VERTEX_RESOLVER_CLASS,
- DefaultVertexResolver.class,
- VertexResolver.class);
- }
-
- /**
- * Get the user's subclassed WorkerContext.
- *
- * @param conf Configuration to check
- * @return User's worker context class
- */
- public static Class<? extends WorkerContext>
- getWorkerContextClass(Configuration conf) {
- return (Class<? extends WorkerContext>)
- conf.getClass(GiraphConstants.WORKER_CONTEXT_CLASS,
- DefaultWorkerContext.class,
- WorkerContext.class);
- }
-
- /**
- * Create a user worker context
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param conf Configuration to check
- * @param graphState State of the graph from the worker
- * @return Instantiated user worker context
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- WorkerContext createWorkerContext(Configuration conf,
- GraphState<I, V, E, M> graphState) {
- Class<? extends WorkerContext> workerContextClass =
- getWorkerContextClass(conf);
- WorkerContext workerContext =
- ReflectionUtils.newInstance(workerContextClass, conf);
- workerContext.setGraphState(graphState);
- return workerContext;
- }
-
- /**
- * Get the user's subclassed {@link MasterCompute}
- *
- * @param conf Configuration to check
- * @return User's master class
- */
- public static Class<? extends MasterCompute>
- getMasterComputeClass(Configuration conf) {
- return (Class<? extends MasterCompute>)
- conf.getClass(GiraphConstants.MASTER_COMPUTE_CLASS,
- DefaultMasterCompute.class,
- MasterCompute.class);
- }
-
- /**
- * Create a user master
- *
- * @param conf Configuration to check
- * @return Instantiated user master
- */
- public static MasterCompute
- createMasterCompute(Configuration conf) {
- Class<? extends MasterCompute> masterComputeClass =
- getMasterComputeClass(conf);
- MasterCompute masterCompute =
- ReflectionUtils.newInstance(masterComputeClass, conf);
- return masterCompute;
- }
-
- /**
- * Get the user's subclassed {@link Vertex}
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param conf Configuration to check
- * @return User's vertex class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- Class<? extends Vertex<I, V, E, M>> getVertexClass(Configuration conf) {
- return (Class<? extends Vertex<I, V, E, M>>)
- conf.getClass(GiraphConstants.VERTEX_CLASS,
- null,
- Vertex.class);
- }
-
- /**
- * Create a user vertex
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param conf Configuration to check
- * @return Instantiated user vertex
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> Vertex<I, V, E, M>
- createVertex(Configuration conf) {
- Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
- Vertex<I, V, E, M> vertex =
- ReflectionUtils.newInstance(vertexClass, conf);
- return vertex;
- }
-
- /**
- * Get the user's subclassed vertex index class.
- *
- * @param <I> Vertex id
- * @param conf Configuration to check
- * @return User's vertex index class
- */
- @SuppressWarnings("unchecked")
- public static <I extends Writable> Class<I>
- getVertexIdClass(Configuration conf) {
- return (Class<I>) conf.getClass(GiraphConstants.VERTEX_ID_CLASS,
- WritableComparable.class);
- }
-
- /**
- * Create a user vertex index
- *
- * @param <I> Vertex id
- * @param conf Configuration to check
- * @return Instantiated user vertex index
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable>
- I createVertexId(Configuration conf) {
- Class<I> vertexIdClass = getVertexIdClass(conf);
- try {
- return vertexIdClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalArgumentException(
- "createVertexId: Failed to instantiate", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(
- "createVertexId: Illegally accessed", e);
- }
- }
-
- /**
- * Get the user's subclassed vertex value class.
- *
- * @param <V> Vertex data
- * @param conf Configuration to check
- * @return User's vertex value class
- */
- @SuppressWarnings("unchecked")
- public static <V extends Writable> Class<V>
- getVertexValueClass(Configuration conf) {
- return (Class<V>) conf.getClass(GiraphConstants.VERTEX_VALUE_CLASS,
- Writable.class);
- }
-
- /**
- * Create a user vertex value
- *
- * @param <V> Vertex data
- * @param conf Configuration to check
- * @return Instantiated user vertex value
- */
- @SuppressWarnings("unchecked")
- public static <V extends Writable> V
- createVertexValue(Configuration conf) {
- Class<V> vertexValueClass = getVertexValueClass(conf);
- if (vertexValueClass == NullWritable.class) {
- return (V) NullWritable.get();
- } else {
- try {
- return vertexValueClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalArgumentException(
- "createVertexValue: Failed to instantiate", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(
- "createVertexValue: Illegally accessed", e);
- }
- }
- }
-
- /**
- * Get the user's subclassed edge value class.
- *
- * @param <E> Edge data
- * @param conf Configuration to check
- * @return User's vertex edge value class
- */
- @SuppressWarnings("unchecked")
- public static <E extends Writable> Class<E>
- getEdgeValueClass(Configuration conf) {
- return (Class<E>) conf.getClass(GiraphConstants.EDGE_VALUE_CLASS,
- Writable.class);
- }
-
- /**
- * Create a user edge value
- *
- * @param <E> Edge data
- * @param conf Configuration to check
- * @return Instantiated user edge value
- */
- @SuppressWarnings("unchecked")
- public static <E extends Writable> E
- createEdgeValue(Configuration conf) {
- Class<E> edgeValueClass = getEdgeValueClass(conf);
- if (edgeValueClass == NullWritable.class) {
- return (E) NullWritable.get();
- } else {
- try {
- return edgeValueClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalArgumentException(
- "createEdgeValue: Failed to instantiate", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(
- "createEdgeValue: Illegally accessed", e);
- }
- }
- }
-
- /**
- * Get the user's subclassed vertex message value class.
- *
- * @param <M> Message data
- * @param conf Configuration to check
- * @return User's vertex message value class
- */
- @SuppressWarnings("unchecked")
- public static <M extends Writable> Class<M>
- getMessageValueClass(Configuration conf) {
- return (Class<M>) conf.getClass(GiraphConstants.MESSAGE_VALUE_CLASS,
- Writable.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/Combiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Combiner.java b/giraph-core/src/main/java/org/apache/giraph/graph/Combiner.java
deleted file mode 100644
index 20f0a6a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Combiner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Abstract class to extend for combining messages sent to the same vertex.
- * Combiner for applications where each two messages for one vertex can be
- * combined into one.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public abstract class Combiner<I extends WritableComparable,
- M extends Writable> {
- /**
- * Combine messageToCombine with originalMassage,
- * by modifying originalMessage.
- *
- * @param vertexIndex Index of the vertex getting these messages
- * @param originalMessage The first message which we want to combine;
- * put the result of combining in this message
- * @param messageToCombine The second message which we want to combine
- */
- public abstract void combine(I vertexIndex, M originalMessage,
- M messageToCombine);
-
- /**
- * Get the initial message. When combined with any other message M,
- * the result should be M.
- *
- * @return Initial message
- */
- public abstract M createInitialMessage();
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index fa1bda3..3292517 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -31,15 +31,17 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.graph.partition.Partition;
-import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.utils.SystemTime;
-import org.apache.giraph.utils.Time;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
import org.apache.giraph.utils.TimedLogger;
-import org.apache.giraph.utils.Times;
+import org.apache.giraph.time.Times;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
deleted file mode 100644
index f0f2d0f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * A dumb implementation of {@link MasterCompute}. This is the default
- * implementation when no MasterCompute is defined by the user. It does
- * nothing.
- */
-
-public class DefaultMasterCompute extends MasterCompute {
-
- @Override
- public void readFields(DataInput in) throws IOException {
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- }
-
- @Override
- public void compute() {
- }
-
- @Override
- public void initialize() throws InstantiationException,
- IllegalAccessException {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
index 0b5272d..c88b2b9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
@@ -20,6 +20,8 @@ package org.apache.giraph.graph;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.vertex.MutableVertex;
+import org.apache.giraph.vertex.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java
deleted file mode 100644
index b1042f7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * A dumb implementation of {@link WorkerContext}. This is the default
- * implementation when no WorkerContext is defined by the user. It does
- * nothing.
- */
-public class DefaultWorkerContext extends WorkerContext {
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- }
-
- @Override
- public void postApplication() { }
-
- @Override
- public void preSuperstep() { }
-
- @Override
- public void postSuperstep() { }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
deleted file mode 100644
index cdc1891..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Input format for reading single edges.
- *
- * @param <I> Vertex id
- * @param <E> Edge data
- */
-public abstract class EdgeInputFormat<I extends WritableComparable,
- E extends Writable> implements GiraphInputFormat {
- /**
- * Logically split the vertices for a graph processing application.
- *
- * Each {@link InputSplit} is then assigned to a worker for processing.
- *
- * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
- * input files are not physically split into chunks. For e.g. a split could
- * be <i><input-file-path, start, offset></i> tuple. The InputFormat
- * also creates the {@link VertexReader} to read the {@link InputSplit}.
- *
- * Also, the number of workers is a hint given to the developer to try to
- * intelligently determine how many splits to create (if this is
- * adjustable) at runtime.
- *
- * @param context Context of the job
- * @param numWorkers Number of workers used for this job
- * @return an array of {@link InputSplit}s for the job.
- */
- @Override
- public abstract List<InputSplit> getSplits(
- JobContext context, int numWorkers) throws IOException,
- InterruptedException;
-
- /**
- * Create an edge reader for a given split. The framework will call
- * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before
- * the split is used.
- *
- * @param split the split to be read
- * @param context the information about the task
- * @return a new record reader
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract EdgeReader<I, E> createEdgeReader(
- InputSplit split,
- TaskAttemptContext context) throws IOException;
-}