You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC
[5/23] GIRAPH-409: Refactor / cleanups (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
new file mode 100644
index 0000000..f33fe58
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -0,0 +1,1405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerClient;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerServer;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GlobalStats;
+import org.apache.giraph.graph.GraphMapper;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.InputSplitPaths;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionExchange;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphTimer;
+import org.apache.giraph.metrics.GiraphTimerContext;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import net.iharder.Base64;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class BspServiceWorker<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends BspService<I, V, E, M>
+ implements CentralizedServiceWorker<I, V, E, M>,
+ ResetSuperstepMetricsObserver {
+ /** Name of gauge for time spent waiting on other workers */
+ public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
+ /** My process health znode */
+ private String myHealthZnode;
+ /** Worker info */
+ private final WorkerInfo workerInfo;
+ /** Worker graph partitioner */
+ private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
+
+ /** IPC Client */
+ private final WorkerClient<I, V, E, M> workerClient;
+ /** IPC Server */
+ private final WorkerServer<I, V, E, M> workerServer;
+ /** Request processor for aggregator requests */
+ private final WorkerAggregatorRequestProcessor
+ workerAggregatorRequestProcessor;
+ /** Master info */
+ private MasterInfo masterInfo = new MasterInfo();
+ /** List of workers */
+ private List<WorkerInfo> workerInfoList = Lists.newArrayList();
+ /** Have the partition exchange children (workers) changed? */
+ private final BspEvent partitionExchangeChildrenChanged;
+
+ /** Worker Context */
+ private final WorkerContext workerContext;
+
+ /** Handler for aggregators */
+ private final WorkerAggregatorHandler aggregatorHandler;
+
+ // Per-Superstep Metrics
+ /** Timer for WorkerContext#postSuperstep */
+ private GiraphTimer wcPostSuperstepTimer;
+ /** Time spent waiting on requests to finish */
+ private GiraphTimer waitRequestsTimer;
+
+ /**
+ * Constructor for setting up the worker.
+ *
+ * @param serverPortList ZooKeeper server port list
+ * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
+ * @param context Mapper context
+ * @param graphMapper Graph mapper
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public BspServiceWorker(
+ String serverPortList,
+ int sessionMsecTimeout,
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphMapper<I, V, E, M> graphMapper)
+ throws IOException, InterruptedException {
+ super(serverPortList, sessionMsecTimeout, context, graphMapper);
+ partitionExchangeChildrenChanged = new PredicateLock(context);
+ registerBspEvent(partitionExchangeChildrenChanged);
+ workerGraphPartitioner =
+ getGraphPartitionerFactory().createWorkerGraphPartitioner();
+ workerInfo = new WorkerInfo();
+ workerServer =
+ new NettyWorkerServer<I, V, E, M>(getConfiguration(), this, context);
+ workerInfo.setInetSocketAddress(workerServer.getMyAddress());
+ workerInfo.setTaskId(getTaskPartition());
+ workerClient =
+ new NettyWorkerClient<I, V, E, M>(context, getConfiguration(), this);
+
+ workerAggregatorRequestProcessor =
+ new NettyWorkerAggregatorRequestProcessor(getContext(),
+ getConfiguration(), this);
+
+ this.workerContext = getConfiguration().createWorkerContext(null);
+
+ aggregatorHandler =
+ new WorkerAggregatorHandler(this, getConfiguration(), context);
+
+ GiraphMetrics.get().addSuperstepResetObserver(this);
+ }
+
+ @Override
+ public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+ waitRequestsTimer = new GiraphTimer(superstepMetrics,
+ TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
+ wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
+ "worker-context-post-superstep", TimeUnit.MICROSECONDS);
+ }
+
+ @Override
+ public WorkerContext getWorkerContext() {
+ return workerContext;
+ }
+
+ @Override
+ public WorkerClient<I, V, E, M> getWorkerClient() {
+ return workerClient;
+ }
+
+ /**
+ * Intended to check the health of the node. For instance, can it ssh,
+ * dmesg, etc. For now, does nothing.
+ * TODO: Make this check configurable by the user (i.e. search dmesg for
+ * problems).
+ *
+ * @return True if healthy (always in this case).
+ */
+ public boolean isHealthy() {
+ return true;
+ }
+
+ /**
+ * Load the vertices/edges from input slits. Do this until all the
+ * InputSplits have been processed.
+ * All workers will try to do as many InputSplits as they can. The master
+ * will monitor progress and stop this once all the InputSplits have been
+ * loaded and check-pointed. Keep track of the last input split path to
+ * ensure the input split cache is flushed prior to marking the last input
+ * split complete.
+ *
+ * Use one or more threads to do the loading.
+ *
+ * @param inputSplitPathList List of input split paths
+ * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
+ * @return Statistics of the vertices and edges loaded
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ private VertexEdgeCount loadInputSplits(
+ List<String> inputSplitPathList,
+ InputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory)
+ throws KeeperException, InterruptedException {
+ VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+ // Determine how many threads to use based on the number of input splits
+ int maxInputSplitThreads =
+ Math.max(
+ inputSplitPathList.size() / getConfiguration().getMaxWorkers(), 1);
+ int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+ maxInputSplitThreads);
+ ExecutorService inputSplitsExecutor =
+ Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder().setNameFormat("load-%d").build());
+ List<Future<VertexEdgeCount>> threadsFutures =
+ Lists.newArrayListWithCapacity(numThreads);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
+ "originally " + getConfiguration().getNumInputSplitsThreads() +
+ " threads(s) for " + inputSplitPathList.size() + " total splits.");
+ }
+ for (int i = 0; i < numThreads; ++i) {
+ Callable<VertexEdgeCount> inputSplitsCallable =
+ inputSplitsCallableFactory.newCallable();
+ threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
+ }
+
+ // Wait until all the threads are done to wait on all requests
+ for (Future<VertexEdgeCount> threadFuture : threadsFutures) {
+ VertexEdgeCount threadVertexEdgeCount =
+ ProgressableUtils.getFutureResult(threadFuture, getContext());
+ vertexEdgeCount =
+ vertexEdgeCount.incrVertexEdgeCount(threadVertexEdgeCount);
+ }
+
+ workerClient.waitAllRequests();
+ inputSplitsExecutor.shutdown();
+ return vertexEdgeCount;
+ }
+
+
+ /**
+ * Load the vertices from the user-defined {@link VertexReader}
+ *
+ * @return Count of vertices and edges loaded
+ */
+ private VertexEdgeCount loadVertices() throws KeeperException,
+ InterruptedException {
+ List<String> inputSplitPathList =
+ getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
+ false, false, true);
+
+ GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+ INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+ null, null);
+
+ VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+ new VertexInputSplitsCallableFactory<I, V, E, M>(
+ getContext(),
+ graphState,
+ getConfiguration(),
+ this,
+ inputSplitPathList,
+ getWorkerInfo(),
+ getZkExt());
+
+ return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+ }
+
+ /**
+ * Load the edges from the user-defined {@link EdgeReader}.
+ *
+ * @return Number of edges loaded
+ */
+ private long loadEdges() throws KeeperException, InterruptedException {
+ List<String> inputSplitPathList =
+ getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
+ false, false, true);
+
+ GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+ INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+ null, null);
+
+ EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+ new EdgeInputSplitsCallableFactory<I, V, E, M>(
+ getContext(),
+ graphState,
+ getConfiguration(),
+ this,
+ inputSplitPathList,
+ getWorkerInfo(),
+ getZkExt());
+
+ return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
+ getEdgeCount();
+ }
+
+ @Override
+ public MasterInfo getMasterInfo() {
+ return masterInfo;
+ }
+
+ @Override
+ public List<WorkerInfo> getWorkerInfoList() {
+ return workerInfoList;
+ }
+
+ /**
+ * Ensure the input splits are ready for processing
+ *
+ * @param inputSplitPaths Input split paths
+ * @param inputSplitEvents Input split events
+ */
+ private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
+ InputSplitEvents inputSplitEvents) {
+ while (true) {
+ Stat inputSplitsReadyStat;
+ try {
+ inputSplitsReadyStat = getZkExt().exists(
+ inputSplitPaths.getAllReadyPath(), true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("ensureInputSplitsReady: " +
+ "KeeperException waiting on input splits", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("ensureInputSplitsReady: " +
+ "InterruptedException waiting on input splits", e);
+ }
+ if (inputSplitsReadyStat != null) {
+ break;
+ }
+ inputSplitEvents.getAllReadyChanged().waitForever();
+ inputSplitEvents.getAllReadyChanged().reset();
+ }
+ }
+
+ /**
+ * Wait for all workers to finish processing input splits.
+ *
+ * @param inputSplitPaths Input split paths
+ * @param inputSplitEvents Input split events
+ */
+ private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
+ InputSplitEvents inputSplitEvents) {
+ String workerInputSplitsDonePath =
+ inputSplitPaths.getDonePath() + "/" +
+ getWorkerInfo().getHostnameId();
+ try {
+ getZkExt().createExt(workerInputSplitsDonePath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "KeeperException creating worker done splits", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "InterruptedException creating worker done splits", e);
+ }
+ while (true) {
+ Stat inputSplitsDoneStat;
+ try {
+ inputSplitsDoneStat =
+ getZkExt().exists(inputSplitPaths.getAllDonePath(),
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "KeeperException waiting on worker done splits", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("waitForOtherWorkers: " +
+ "InterruptedException waiting on worker done splits", e);
+ }
+ if (inputSplitsDoneStat != null) {
+ break;
+ }
+ inputSplitEvents.getAllDoneChanged().waitForever();
+ inputSplitEvents.getAllDoneChanged().reset();
+ }
+ }
+
+ @Override
+ public FinishedSuperstepStats setup() {
+ // Unless doing a restart, prepare for computation:
+ // 1. Start superstep INPUT_SUPERSTEP (no computation)
+ // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
+ // 3. Process input splits until there are no more.
+ // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
+ // 5. Process any mutations deriving from add edge requests
+ // 6. Wait for superstep INPUT_SUPERSTEP to complete.
+ if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+ setCachedSuperstep(getRestartedSuperstep());
+ return new FinishedSuperstepStats(false, -1, -1);
+ }
+
+ JSONObject jobState = getJobState();
+ if (jobState != null) {
+ try {
+ if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
+ ApplicationState.START_SUPERSTEP) &&
+ jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
+ getSuperstep()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Restarting from an automated " +
+ "checkpointed superstep " +
+ getSuperstep() + ", attempt " +
+ getApplicationAttempt());
+ }
+ setRestartedSuperstep(getSuperstep());
+ return new FinishedSuperstepStats(false, -1, -1);
+ }
+ } catch (JSONException e) {
+ throw new RuntimeException(
+ "setup: Failed to get key-values from " +
+ jobState.toString(), e);
+ }
+ }
+
+ // Add the partitions that this worker owns
+ GraphState<I, V, E, M> graphState =
+ new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
+ getContext(), getGraphMapper(), null, null);
+ Collection<? extends PartitionOwner> masterSetPartitionOwners =
+ startSuperstep(graphState);
+ workerGraphPartitioner.updatePartitionOwners(
+ getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+
+/*if[HADOOP_NON_SECURE]
+ workerClient.setup();
+else[HADOOP_NON_SECURE]*/
+ workerClient.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
+
+ VertexEdgeCount vertexEdgeCount;
+
+ if (getConfiguration().hasVertexInputFormat()) {
+ // Ensure the vertex InputSplits are ready for processing
+ ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
+ getContext().progress();
+ try {
+ vertexEdgeCount = loadVertices();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setup: loadVertices failed with InterruptedException", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setup: loadVertices failed with KeeperException", e);
+ }
+ getContext().progress();
+ } else {
+ vertexEdgeCount = new VertexEdgeCount();
+ }
+
+ if (getConfiguration().hasEdgeInputFormat()) {
+ // Ensure the edge InputSplits are ready for processing
+ ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
+ getContext().progress();
+ try {
+ vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setup: loadEdges failed with InterruptedException", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setup: loadEdges failed with KeeperException", e);
+ }
+ getContext().progress();
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
+ }
+
+ if (getConfiguration().hasVertexInputFormat()) {
+ // Workers wait for each other to finish, coordinated by master
+ waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
+ }
+
+ if (getConfiguration().hasEdgeInputFormat()) {
+ // Workers wait for each other to finish, coordinated by master
+ waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
+ }
+
+ // Create remaining partitions owned by this worker.
+ for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+ if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
+ !getPartitionStore().hasPartition(
+ partitionOwner.getPartitionId())) {
+ Partition<I, V, E, M> partition =
+ getConfiguration().createPartition(
+ partitionOwner.getPartitionId(), getContext());
+ getPartitionStore().addPartition(partition);
+ }
+ }
+
+ if (getConfiguration().hasEdgeInputFormat()) {
+ // Create vertices from added edges via vertex resolver.
+ // Doing this at the beginning of superstep 0 is not enough,
+ // because we want the vertex/edge stats to be accurate.
+ workerServer.resolveMutations(graphState);
+ }
+
+ // Generate the partition stats for the input superstep and process
+ // if necessary
+ List<PartitionStats> partitionStatsList =
+ new ArrayList<PartitionStats>();
+ for (Partition<I, V, E, M> partition :
+ getPartitionStore().getPartitions()) {
+ PartitionStats partitionStats =
+ new PartitionStats(partition.getId(),
+ partition.getVertexCount(),
+ 0,
+ partition.getEdgeCount(),
+ 0);
+ partitionStatsList.add(partitionStats);
+ }
+ workerGraphPartitioner.finalizePartitionStats(
+ partitionStatsList, getPartitionStore());
+
+ return finishSuperstep(graphState, partitionStatsList);
+ }
+
+ /**
+ * Register the health of this worker for a given superstep
+ *
+ * @param superstep Superstep to register health on
+ */
+ private void registerHealth(long superstep) {
+ JSONArray hostnamePort = new JSONArray();
+ hostnamePort.put(getHostname());
+
+ hostnamePort.put(workerInfo.getPort());
+
+ String myHealthPath = null;
+ if (isHealthy()) {
+ myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
+ getSuperstep());
+ } else {
+ myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
+ getSuperstep());
+ }
+ myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
+ try {
+ myHealthZnode = getZkExt().createExt(
+ myHealthPath,
+ WritableUtils.writeToByteArray(workerInfo),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("registerHealth: myHealthPath already exists (likely " +
+ "from previous failure): " + myHealthPath +
+ ". Waiting for change in attempts " +
+ "to re-join the application");
+ getApplicationAttemptChangedEvent().waitForever();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("registerHealth: Got application " +
+ "attempt changed event, killing self");
+ }
+ throw new IllegalStateException(
+ "registerHealth: Trying " +
+ "to get the new application attempt by killing self", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Creating " + myHealthPath +
+ " failed with KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Creating " + myHealthPath +
+ " failed with InterruptedException", e);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("registerHealth: Created my health node for attempt=" +
+ getApplicationAttempt() + ", superstep=" +
+ getSuperstep() + " with " + myHealthZnode +
+ " and workerInfo= " + workerInfo);
+ }
+ }
+
+ /**
+ * Do this to help notify the master quicker that this worker has failed.
+ */
+ private void unregisterHealth() {
+ LOG.error("unregisterHealth: Got failure, unregistering health on " +
+ myHealthZnode + " on superstep " + getSuperstep());
+ try {
+ getZkExt().deleteExt(myHealthZnode, -1, false);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "unregisterHealth: InterruptedException - Couldn't delete " +
+ myHealthZnode, e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "unregisterHealth: KeeperException - Couldn't delete " +
+ myHealthZnode, e);
+ }
+ }
+
+ @Override
+ public void failureCleanup() {
+ unregisterHealth();
+ }
+
+ @Override
+ public Collection<? extends PartitionOwner> startSuperstep(
+ GraphState<I, V, E, M> graphState) {
+ // Algorithm:
+ // 1. Communication service will combine message from previous
+ // superstep
+ // 2. Register my health for the next superstep.
+ // 3. Wait until the partition assignment is complete and get it
+ // 4. Get the aggregator values from the previous superstep
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ workerServer.prepareSuperstep(graphState);
+ }
+
+ registerHealth(getSuperstep());
+
+ String addressesAndPartitionsPath =
+ getAddressesAndPartitionsPath(getApplicationAttempt(),
+ getSuperstep());
+ AddressesAndPartitionsWritable addressesAndPartitions =
+ new AddressesAndPartitionsWritable(
+ workerGraphPartitioner.createPartitionOwner().getClass());
+ try {
+ while (getZkExt().exists(addressesAndPartitionsPath, true) ==
+ null) {
+ getAddressesAndPartitionsReadyChangedEvent().waitForever();
+ getAddressesAndPartitionsReadyChangedEvent().reset();
+ }
+ WritableUtils.readFieldsFromZnode(
+ getZkExt(),
+ addressesAndPartitionsPath,
+ false,
+ null,
+ addressesAndPartitions);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "startSuperstep: KeeperException getting assignments", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "startSuperstep: InterruptedException getting assignments", e);
+ }
+
+ workerInfoList.clear();
+ workerInfoList = addressesAndPartitions.getWorkerInfos();
+ masterInfo = addressesAndPartitions.getMasterInfo();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("startSuperstep: " + masterInfo);
+ LOG.info("startSuperstep: Ready for computation on superstep " +
+ getSuperstep() + " since worker " +
+ "selection and vertex range assignments are done in " +
+ addressesAndPartitionsPath);
+ }
+
+ getContext().setStatus("startSuperstep: " +
+ getGraphMapper().getMapFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+ return addressesAndPartitions.getPartitionOwners();
+ }
+
+ @Override
+ public FinishedSuperstepStats finishSuperstep(
+ GraphState<I, V, E, M> graphState,
+ List<PartitionStats> partitionStatsList) {
+ // This barrier blocks until success (or the master signals it to
+ // restart).
+ //
+ // Master will coordinate the barriers and aggregate "doneness" of all
+ // the vertices. Each worker will:
+ // 1. Ensure that the requests are complete
+ // 2. Execute user postSuperstep() if necessary.
+ // 3. Save aggregator values that are in use.
+ // 4. Report the statistics (vertices, edges, messages, etc.)
+ // of this worker
+ // 5. Let the master know it is finished.
+ // 6. Wait for the master's global stats, and check if done
+ waitForRequestsToFinish();
+
+ graphState.getGraphMapper().notifyFinishedCommunication();
+
+ long workerSentMessages = 0;
+ for (PartitionStats partitionStats : partitionStatsList) {
+ workerSentMessages += partitionStats.getMessagesSentCount();
+ }
+
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ getWorkerContext().setGraphState(graphState);
+ GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
+ getWorkerContext().postSuperstep();
+ timerContext.stop();
+ getContext().progress();
+ }
+
+ aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("finishSuperstep: Superstep " + getSuperstep() +
+ ", messages = " + workerSentMessages + " " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+
+ writeFinshedSuperstepInfoToZK(partitionStatsList, workerSentMessages);
+
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "finishSuperstep: (waiting for rest " +
+ "of workers) " +
+ getGraphMapper().getMapFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+
+ String superstepFinishedNode =
+ getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
+
+ waitForOtherWorkers(superstepFinishedNode);
+
+ GlobalStats globalStats = new GlobalStats();
+ WritableUtils.readFieldsFromZnode(
+ getZkExt(), superstepFinishedNode, false, null, globalStats);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
+ " with global stats " + globalStats);
+ }
+ incrCachedSuperstep();
+ getContext().setStatus("finishSuperstep: (all workers done) " +
+ getGraphMapper().getMapFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+
+ return new FinishedSuperstepStats(
+ globalStats.getHaltComputation(),
+ globalStats.getVertexCount(),
+ globalStats.getEdgeCount());
+ }
+
+ /**
+ * Wait for all the requests to finish.
+ */
+ private void waitForRequestsToFinish() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("finishSuperstep: Waiting on all requests, superstep " +
+ getSuperstep() + " " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+ GiraphTimerContext timerContext = waitRequestsTimer.time();
+ workerClient.waitAllRequests();
+ timerContext.stop();
+ }
+
+ /**
+ * Wait for all the other Workers to finish the superstep.
+ *
+ * @param superstepFinishedNode ZooKeeper path to wait on.
+ */
+ private void waitForOtherWorkers(String superstepFinishedNode) {
+ try {
+ while (getZkExt().exists(superstepFinishedNode, true) == null) {
+ getSuperstepFinishedEvent().waitForever();
+ getSuperstepFinishedEvent().reset();
+ }
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "finishSuperstep: Failed while waiting for master to " +
+ "signal completion of superstep " + getSuperstep(), e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "finishSuperstep: Failed while waiting for master to " +
+ "signal completion of superstep " + getSuperstep(), e);
+ }
+ }
+
+ /**
+ * Write finished superstep info to ZooKeeper.
+ *
+ * @param partitionStatsList List of partition stats from superstep.
+ * @param workerSentMessages Number of messages sent in superstep.
+ */
+ private void writeFinshedSuperstepInfoToZK(
+ List<PartitionStats> partitionStatsList, long workerSentMessages) {
+ Collection<PartitionStats> finalizedPartitionStats =
+ workerGraphPartitioner.finalizePartitionStats(
+ partitionStatsList, getPartitionStore());
+ List<PartitionStats> finalizedPartitionStatsList =
+ new ArrayList<PartitionStats>(finalizedPartitionStats);
+ byte[] partitionStatsBytes =
+ WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
+ WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
+ metrics.readFromRegistry();
+ byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
+
+ JSONObject workerFinishedInfoObj = new JSONObject();
+ try {
+ workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
+ Base64.encodeBytes(partitionStatsBytes));
+ workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
+ workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
+ Base64.encodeBytes(metricsBytes));
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+
+ String finishedWorkerPath =
+ getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
+ "/" + getHostnamePartitionId();
+ try {
+ getZkExt().createExt(finishedWorkerPath,
+ workerFinishedInfoObj.toString().getBytes(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("finishSuperstep: finished worker path " +
+ finishedWorkerPath + " already exists!");
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Creating " + finishedWorkerPath +
+ " failed with KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Creating " + finishedWorkerPath +
+ " failed with InterruptedException", e);
+ }
+ }
+
+ /**
+ * Save the vertices using the user-defined VertexOutputFormat from our
+ * vertexArray based on the split.
+ * @throws InterruptedException
+ */
+ private void saveVertices() throws IOException, InterruptedException {
+ if (getConfiguration().getVertexOutputFormatClass() == null) {
+ LOG.warn("saveVertices: " +
+ GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
+ " not specified -- there will be no saved output");
+ return;
+ }
+
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "saveVertices: Starting to save vertices");
+ VertexOutputFormat<I, V, E> vertexOutputFormat =
+ getConfiguration().createVertexOutputFormat();
+ VertexWriter<I, V, E> vertexWriter =
+ vertexOutputFormat.createVertexWriter(getContext());
+ vertexWriter.initialize(getContext());
+ for (Partition<I, V, E, M> partition :
+ getPartitionStore().getPartitions()) {
+ for (Vertex<I, V, E, M> vertex : partition) {
+ getContext().progress();
+ vertexWriter.writeVertex(vertex);
+ }
+ getContext().progress();
+ }
+ vertexWriter.close(getContext());
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "saveVertices: Done saving vertices");
+ }
+
+ @Override
+ public void cleanup() throws IOException, InterruptedException {
+ workerClient.closeConnections();
+ setCachedSuperstep(getSuperstep() - 1);
+ saveVertices();
+ // All worker processes should denote they are done by adding special
+ // znode. Once the number of znodes equals the number of partitions
+ // for workers and masters, the master will clean up the ZooKeeper
+ // znodes associated with this job.
+ String workerCleanedUpPath = cleanedUpPath + "/" +
+ getTaskPartition() + WORKER_SUFFIX;
+ try {
+ String finalFinishedPath =
+ getZkExt().createExt(workerCleanedUpPath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("cleanup: Notifying master its okay to cleanup with " +
+ finalFinishedPath);
+ }
+ } catch (KeeperException.NodeExistsException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("cleanup: Couldn't create finished node '" +
+ workerCleanedUpPath);
+ }
+ } catch (KeeperException e) {
+ // Cleaning up, it's okay to fail after cleanup is successful
+ LOG.error("cleanup: Got KeeperException on notification " +
+ "to master about cleanup", e);
+ } catch (InterruptedException e) {
+ // Cleaning up, it's okay to fail after cleanup is successful
+ LOG.error("cleanup: Got InterruptedException on notification " +
+ "to master about cleanup", e);
+ }
+ try {
+ getZkExt().close();
+ } catch (InterruptedException e) {
+ // cleanup phase -- just log the error
+ LOG.error("cleanup: Zookeeper failed to close with " + e);
+ }
+
+ if (getConfiguration().metricsEnabled()) {
+ GiraphMetrics.get().dumpToStdout();
+ }
+
+ // Preferably would shut down the service only after
+ // all clients have disconnected (or the exceptions on the
+ // client side ignored).
+ workerServer.close();
+ }
+
+ @Override
+ public void storeCheckpoint() throws IOException {
+ LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+ "storeCheckpoint: Starting checkpoint " +
+ getGraphMapper().getMapFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+
+ // Algorithm:
+ // For each partition, dump vertices and messages
+ Path metadataFilePath =
+ new Path(getCheckpointBasePath(getSuperstep()) + "." +
+ getHostnamePartitionId() +
+ CHECKPOINT_METADATA_POSTFIX);
+ Path verticesFilePath =
+ new Path(getCheckpointBasePath(getSuperstep()) + "." +
+ getHostnamePartitionId() +
+ CHECKPOINT_VERTICES_POSTFIX);
+ Path validFilePath =
+ new Path(getCheckpointBasePath(getSuperstep()) + "." +
+ getHostnamePartitionId() +
+ CHECKPOINT_VALID_POSTFIX);
+
+ // Remove these files if they already exist (shouldn't though, unless
+ // of previous failure of this worker)
+ if (getFs().delete(validFilePath, false)) {
+ LOG.warn("storeCheckpoint: Removed valid file " +
+ validFilePath);
+ }
+ if (getFs().delete(metadataFilePath, false)) {
+ LOG.warn("storeCheckpoint: Removed metadata file " +
+ metadataFilePath);
+ }
+ if (getFs().delete(verticesFilePath, false)) {
+ LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
+ }
+
+ FSDataOutputStream verticesOutputStream =
+ getFs().create(verticesFilePath);
+ ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
+ DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
+ for (Partition<I, V, E, M> partition :
+ getPartitionStore().getPartitions()) {
+ long startPos = verticesOutputStream.getPos();
+ partition.write(verticesOutputStream);
+ // write messages
+ getServerData().getCurrentMessageStore().writePartition(
+ verticesOutputStream, partition.getId());
+ // Write the metadata for this partition
+ // Format:
+ // <index count>
+ // <index 0 start pos><partition id>
+ // <index 1 start pos><partition id>
+ metadataOutput.writeLong(startPos);
+ metadataOutput.writeInt(partition.getId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("storeCheckpoint: Vertex file starting " +
+ "offset = " + startPos + ", length = " +
+ (verticesOutputStream.getPos() - startPos) +
+ ", partition = " + partition.toString());
+ }
+ getContext().progress();
+ }
+ // Metadata is buffered and written at the end since it's small and
+ // needs to know how many partitions this worker owns
+ FSDataOutputStream metadataOutputStream =
+ getFs().create(metadataFilePath);
+ metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
+ metadataOutputStream.write(metadataByteStream.toByteArray());
+ metadataOutputStream.close();
+ verticesOutputStream.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("storeCheckpoint: Finished metadata (" +
+ metadataFilePath + ") and vertices (" + verticesFilePath + ").");
+ }
+
+ getFs().createNewFile(validFilePath);
+
+ // Notify master that checkpoint is stored
+ String workerWroteCheckpoint =
+ getWorkerWroteCheckpointPath(getApplicationAttempt(),
+ getSuperstep()) + "/" + getHostnamePartitionId();
+ try {
+ getZkExt().createExt(workerWroteCheckpoint,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("finishSuperstep: wrote checkpoint worker path " +
+ workerWroteCheckpoint + " already exists!");
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+ " failed with KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Creating " +
+ workerWroteCheckpoint +
+ " failed with InterruptedException", e);
+ }
+ }
+
+ @Override
+ public VertexEdgeCount loadCheckpoint(long superstep) {
+ try {
+ // clear old message stores
+ getServerData().getIncomingMessageStore().clearAll();
+ getServerData().getCurrentMessageStore().clearAll();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "loadCheckpoint: Failed to clear message stores ", e);
+ }
+
+ // Algorithm:
+ // Examine all the partition owners and load the ones
+ // that match my hostname and id from the master designated checkpoint
+ // prefixes.
+ long startPos = 0;
+ int loadedPartitions = 0;
+ for (PartitionOwner partitionOwner :
+ workerGraphPartitioner.getPartitionOwners()) {
+ if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
+ String metadataFile =
+ partitionOwner.getCheckpointFilesPrefix() +
+ CHECKPOINT_METADATA_POSTFIX;
+ String partitionsFile =
+ partitionOwner.getCheckpointFilesPrefix() +
+ CHECKPOINT_VERTICES_POSTFIX;
+ try {
+ int partitionId = -1;
+ DataInputStream metadataStream =
+ getFs().open(new Path(metadataFile));
+ int partitions = metadataStream.readInt();
+ for (int i = 0; i < partitions; ++i) {
+ startPos = metadataStream.readLong();
+ partitionId = metadataStream.readInt();
+ if (partitionId == partitionOwner.getPartitionId()) {
+ break;
+ }
+ }
+ if (partitionId != partitionOwner.getPartitionId()) {
+ throw new IllegalStateException(
+ "loadCheckpoint: " + partitionOwner +
+ " not found!");
+ }
+ metadataStream.close();
+ Partition<I, V, E, M> partition =
+ getConfiguration().createPartition(partitionId, getContext());
+ DataInputStream partitionsStream =
+ getFs().open(new Path(partitionsFile));
+ if (partitionsStream.skip(startPos) != startPos) {
+ throw new IllegalStateException(
+ "loadCheckpoint: Failed to skip " + startPos +
+ " on " + partitionsFile);
+ }
+ partition.readFields(partitionsStream);
+ if (partitionsStream.readBoolean()) {
+ getServerData().getCurrentMessageStore().readFieldsForPartition(
+ partitionsStream, partitionId);
+ }
+ partitionsStream.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadCheckpoint: Loaded partition " +
+ partition);
+ }
+ if (getPartitionStore().hasPartition(partitionId)) {
+ throw new IllegalStateException(
+ "loadCheckpoint: Already has partition owner " +
+ partitionOwner);
+ }
+ getPartitionStore().addPartition(partition);
+ getContext().progress();
+ ++loadedPartitions;
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "loadCheckpoint: Failed to get partition owner " +
+ partitionOwner, e);
+ }
+ }
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
+ " partitions of out " +
+ workerGraphPartitioner.getPartitionOwners().size() +
+ " total.");
+ }
+
+ // Load global statistics
+ GlobalStats globalStats = null;
+ String finalizedCheckpointPath =
+ getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+ try {
+ DataInputStream finalizedStream =
+ getFs().open(new Path(finalizedCheckpointPath));
+ globalStats = new GlobalStats();
+ globalStats.readFields(finalizedStream);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "loadCheckpoint: Failed to load global statistics", e);
+ }
+
+ // Communication service needs to setup the connections prior to
+ // processing vertices
+/*if[HADOOP_NON_SECURE]
+ workerClient.setup();
+else[HADOOP_NON_SECURE]*/
+ workerClient.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
+ return new VertexEdgeCount(globalStats.getVertexCount(),
+ globalStats.getEdgeCount());
+ }
+
+ /**
+ * Send the worker partitions to their destination workers
+ *
+ * @param workerPartitionMap Map of worker info to the partitions stored
+ * on this worker to be sent
+ */
+ private void sendWorkerPartitions(
+ Map<WorkerInfo, List<Integer>> workerPartitionMap) {
+ List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
+ new ArrayList<Entry<WorkerInfo, List<Integer>>>(
+ workerPartitionMap.entrySet());
+ Collections.shuffle(randomEntryList);
+ WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor =
+ new NettyWorkerClientRequestProcessor<I, V, E, M>(getContext(),
+ getConfiguration(), this);
+ for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
+ randomEntryList) {
+ for (Integer partitionId : workerPartitionList.getValue()) {
+ Partition<I, V, E, M> partition =
+ getPartitionStore().removePartition(partitionId);
+ if (partition == null) {
+ throw new IllegalStateException(
+ "sendWorkerPartitions: Couldn't find partition " +
+ partitionId + " to send to " +
+ workerPartitionList.getKey());
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("sendWorkerPartitions: Sending worker " +
+ workerPartitionList.getKey() + " partition " +
+ partitionId);
+ }
+ workerClientRequestProcessor.sendPartitionRequest(
+ workerPartitionList.getKey(),
+ partition);
+ }
+ }
+
+
+ try {
+ workerClientRequestProcessor.flush();
+ workerClient.waitAllRequests();
+ } catch (IOException e) {
+ throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
+ }
+ String myPartitionExchangeDonePath =
+ getPartitionExchangeWorkerPath(
+ getApplicationAttempt(), getSuperstep(), getWorkerInfo());
+ try {
+ getZkExt().createExt(myPartitionExchangeDonePath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "sendWorkerPartitions: KeeperException to create " +
+ myPartitionExchangeDonePath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "sendWorkerPartitions: InterruptedException to create " +
+ myPartitionExchangeDonePath, e);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("sendWorkerPartitions: Done sending all my partitions.");
+ }
+ }
+
+ @Override
+ public final void exchangeVertexPartitions(
+ Collection<? extends PartitionOwner> masterSetPartitionOwners) {
+ // 1. Fix the addresses of the partition ids if they have changed.
+ // 2. Send all the partitions to their destination workers in a random
+ // fashion.
+ // 3. Notify completion with a ZooKeeper stamp
+ // 4. Wait for all my dependencies to be done (if any)
+ // 5. Add the partitions to myself.
+ PartitionExchange partitionExchange =
+ workerGraphPartitioner.updatePartitionOwners(
+ getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+ workerClient.openConnections();
+
+ Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
+ partitionExchange.getSendWorkerPartitionMap();
+ if (!getPartitionStore().isEmpty()) {
+ sendWorkerPartitions(sendWorkerPartitionMap);
+ }
+
+ Set<WorkerInfo> myDependencyWorkerSet =
+ partitionExchange.getMyDependencyWorkerSet();
+ Set<String> workerIdSet = new HashSet<String>();
+ for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
+ if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
+ throw new IllegalStateException(
+ "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
+ }
+ }
+ if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
+ "exiting early");
+ }
+ return;
+ }
+
+ String vertexExchangePath =
+ getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
+ List<String> workerDoneList;
+ try {
+ while (true) {
+ workerDoneList = getZkExt().getChildrenExt(
+ vertexExchangePath, true, false, false);
+ workerIdSet.removeAll(workerDoneList);
+ if (workerIdSet.isEmpty()) {
+ break;
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("exchangeVertexPartitions: Waiting for workers " +
+ workerIdSet);
+ }
+ getPartitionExchangeChildrenChangedEvent().waitForever();
+ getPartitionExchangeChildrenChangedEvent().reset();
+ }
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("exchangeVertexPartitions: Done with exchange.");
+ }
+ }
+
+ /**
+ * Get event when the state of a partition exchange has changed.
+ *
+ * @return Event to check.
+ */
+ public final BspEvent getPartitionExchangeChildrenChangedEvent() {
+ return partitionExchangeChildrenChanged;
+ }
+
+ @Override
+ protected boolean processEvent(WatchedEvent event) {
+ boolean foundEvent = false;
+ if (event.getPath().startsWith(masterJobStatePath) &&
+ (event.getType() == EventType.NodeChildrenChanged)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("processEvent: Job state changed, checking " +
+ "to see if it needs to restart");
+ }
+ JSONObject jsonObj = getJobState();
+ try {
+ if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
+ ApplicationState.START_SUPERSTEP) &&
+ jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
+ getApplicationAttempt()) {
+ LOG.fatal("processEvent: Worker will restart " +
+ "from command - " + jsonObj.toString());
+ System.exit(-1);
+ }
+ } catch (JSONException e) {
+ throw new RuntimeException(
+ "processEvent: Couldn't properly get job state from " +
+ jsonObj.toString());
+ }
+ foundEvent = true;
+ } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
+ event.getType() == EventType.NodeChildrenChanged) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("processEvent : partitionExchangeChildrenChanged " +
+ "(at least one worker is done sending partitions)");
+ }
+ partitionExchangeChildrenChanged.signal();
+ foundEvent = true;
+ }
+
+ return foundEvent;
+ }
+
+ @Override
+ public WorkerInfo getWorkerInfo() {
+ return workerInfo;
+ }
+
+ @Override
+ public PartitionStore<I, V, E, M> getPartitionStore() {
+ return getServerData().getPartitionStore();
+ }
+
+ @Override
+ public PartitionOwner getVertexPartitionOwner(I vertexId) {
+ return workerGraphPartitioner.getPartitionOwner(vertexId);
+ }
+
+ @Override
+ public Iterable<? extends PartitionOwner> getPartitionOwners() {
+ return workerGraphPartitioner.getPartitionOwners();
+ }
+
+ @Override
+ public Partition<I, V, E, M> getPartition(I vertexId) {
+ return getPartitionStore().getPartition(getPartitionId(vertexId));
+ }
+
+ @Override
+ public Integer getPartitionId(I vertexId) {
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
+ return partitionOwner.getPartitionId();
+ }
+
+ @Override
+ public boolean hasPartition(Integer partitionId) {
+ return getPartitionStore().hasPartition(partitionId);
+ }
+
+ @Override
+ public Vertex<I, V, E, M> getVertex(I vertexId) {
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
+ if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) {
+ return getPartitionStore().getPartition(
+ partitionOwner.getPartitionId()).getVertex(vertexId);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public ServerData<I, V, E, M> getServerData() {
+ return workerServer.getServerData();
+ }
+
+ @Override
+ public WorkerAggregatorHandler getAggregatorHandler() {
+ return aggregatorHandler;
+ }
+
+ @Override
+ public void prepareSuperstep() {
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerContext.java
new file mode 100644
index 0000000..0ffdc0f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+/**
+ * A dumb implementation of {@link WorkerContext}. This is the default
+ * implementation when no WorkerContext is defined by the user. It does
+ * nothing.
+ */
+public class DefaultWorkerContext extends WorkerContext {
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ }
+
+ @Override
+ public void postApplication() { }
+
+ @Override
+ public void preSuperstep() { }
+
+ @Override
+ public void postSuperstep() { }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
new file mode 100644
index 0000000..23e2ff7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.yammer.metrics.core.Counter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Load as many edge input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeInputSplitsCallable<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends InputSplitsCallable<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(
+ EdgeInputSplitsCallable.class);
+ /** Total edges loaded */
+ private long totalEdgesLoaded = 0;
+ /** Input split max edges (-1 denotes all) */
+ private final long inputSplitMaxEdges;
+
+ // Metrics
+ /** number of edges loaded counter */
+ private final Counter edgesLoadedCounter;
+
+ /**
+ * Constructor.
+ *
+ * @param context Context
+ * @param graphState Graph state
+ * @param configuration Configuration
+ * @param bspServiceWorker service worker
+ * @param inputSplitPathList List of the paths of the input splits
+ * @param workerInfo This worker's info
+ * @param zooKeeperExt Handle to ZooKeeperExt
+ */
+ public EdgeInputSplitsCallable(
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ BspServiceWorker<I, V, E, M> bspServiceWorker,
+ List<String> inputSplitPathList,
+ WorkerInfo workerInfo,
+ ZooKeeperExt zooKeeperExt) {
+ super(context, graphState, configuration, bspServiceWorker,
+ inputSplitPathList, workerInfo, zooKeeperExt,
+ BspServiceWorker.EDGE_INPUT_SPLIT_RESERVED_NODE,
+ BspServiceWorker.EDGE_INPUT_SPLIT_FINISHED_NODE,
+ bspServiceWorker.getEdgeInputSplitsEvents());
+
+ inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
+
+ // Initialize Metrics
+ GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob();
+ edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED);
+ }
+
+ /**
+ * Read edges from input split. If testing, the user may request a
+ * maximum number of edges to be read from an input split.
+ *
+ * @param inputSplit Input split to process with edge reader
+ * @param graphState Current graph state
+ * @return Edges loaded from this input split
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ protected VertexEdgeCount readInputSplit(
+ InputSplit inputSplit,
+ GraphState<I, V, E, M> graphState) throws IOException,
+ InterruptedException {
+ EdgeInputFormat<I, E> edgeInputFormat =
+ configuration.createEdgeInputFormat();
+ EdgeReader<I, E> edgeReader =
+ edgeInputFormat.createEdgeReader(inputSplit, context);
+ edgeReader.initialize(inputSplit, context);
+ long inputSplitEdgesLoaded = 0;
+ while (edgeReader.nextEdge()) {
+ EdgeWithSource<I, E> readerEdge = edgeReader.getCurrentEdge();
+ if (readerEdge.getSourceVertexId() == null) {
+ throw new IllegalArgumentException(
+ "readInputSplit: Edge reader returned an edge " +
+ "without a source vertex id! - " + readerEdge);
+ }
+ if (readerEdge.getEdge().getTargetVertexId() == null) {
+ throw new IllegalArgumentException(
+ "readInputSplit: Edge reader returned an edge " +
+ "without a target vertex id! - " + readerEdge);
+ }
+ if (readerEdge.getEdge().getValue() == null) {
+ throw new IllegalArgumentException(
+ "readInputSplit: Edge reader returned an edge " +
+ "without a value! - " + readerEdge);
+ }
+
+ graphState.getWorkerClientRequestProcessor().addEdgeRequest(
+ readerEdge.getSourceVertexId(), readerEdge.getEdge());
+ context.progress(); // do this before potential data transfer
+ ++inputSplitEdgesLoaded;
+
+ // Update status every 1M edges
+ if (((inputSplitEdgesLoaded + totalEdgesLoaded) % 1000000) == 0) {
+ LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+ "readInputSplit: Loaded " +
+ (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+
+ // For sampling, or to limit outlier input splits, the number of
+ // records per input split can be limited
+ if (inputSplitMaxEdges > 0 &&
+ inputSplitEdgesLoaded >= inputSplitMaxEdges) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("readInputSplit: Leaving the input " +
+ "split early, reached maximum edges " +
+ inputSplitEdgesLoaded);
+ }
+ break;
+ }
+ }
+ edgeReader.close();
+ totalEdgesLoaded += inputSplitEdgesLoaded;
+ edgesLoadedCounter.inc(inputSplitEdgesLoaded);
+ return new VertexEdgeCount(0, inputSplitEdgesLoaded);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
new file mode 100644
index 0000000..1a9a744
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.List;
+
+/**
+ * Factory for {@link EdgeInputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements InputSplitsCallableFactory<I, V, E, M> {
+ /** Mapper context. */
+ private final Mapper<?, ?, ?, ?>.Context context;
+ /** Graph state. */
+ private final GraphState<I, V, E, M> graphState;
+ /** Configuration. */
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ /** {@link BspServiceWorker} we're running on. */
+ private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+ /** List of input split paths. */
+ private final List<String> inputSplitPathList;
+ /** Worker info. */
+ private final WorkerInfo workerInfo;
+ /** {@link ZooKeeperExt} for this worker. */
+ private final ZooKeeperExt zooKeeperExt;
+
+ /**
+ * Constructor.
+ *
+ * @param context Mapper context
+ * @param graphState Graph state
+ * @param configuration Configuration
+ * @param bspServiceWorker Calling {@link BspServiceWorker}
+ * @param inputSplitPathList List of input split paths
+ * @param workerInfo Worker info
+ * @param zooKeeperExt {@link ZooKeeperExt} for this worker
+ */
+ public EdgeInputSplitsCallableFactory(
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ BspServiceWorker<I, V, E, M> bspServiceWorker,
+ List<String> inputSplitPathList,
+ WorkerInfo workerInfo,
+ ZooKeeperExt zooKeeperExt) {
+ this.context = context;
+ this.graphState = graphState;
+ this.configuration = configuration;
+ this.bspServiceWorker = bspServiceWorker;
+ this.inputSplitPathList = inputSplitPathList;
+ this.workerInfo = workerInfo;
+ this.zooKeeperExt = zooKeeperExt;
+ }
+
+ @Override
+ public InputSplitsCallable<I, V, E, M> newCallable() {
+ return new EdgeInputSplitsCallable<I, V, E, M>(
+ context,
+ graphState,
+ configuration,
+ bspServiceWorker,
+ inputSplitPathList,
+ workerInfo,
+ zooKeeperExt);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
new file mode 100644
index 0000000..b82da7d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility class to extract the list of InputSplits from the
+ * ZooKeeper tree of "claimable splits" the master created,
+ * and to sort the list to favor local data blocks.
+ *
+ * This class provides an Iterator for the list the worker will
+ * claim splits from, making all sorting and data-code locality
+ * processing done here invisible to callers. The aim is to cut
+ * down on the number of ZK reads workers perform before locating
+ * an unclaimed InputSplit.
+ */
+public class InputSplitPathOrganizer implements Iterable<String> {
+ /** The worker's local ZooKeeperExt ref */
+ private final ZooKeeperExt zooKeeper;
+ /** The List of InputSplit znode paths */
+ private final List<String> pathList;
+ /** The worker's hostname */
+ private final String hostName;
+ /** The adjusted base offset by which to iterate on the path list */
+ private int baseOffset;
+
+ /**
+ * Constructor
+ *
+ * @param zooKeeper the worker's ZkExt
+ * @param zkPathList the path to read from
+ * @param hostName the worker's host name (for matching)
+ * @param port the port number for this worker
+ */
+ public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
+ final String zkPathList, final String hostName, final int port)
+ throws KeeperException, InterruptedException {
+ this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
+ hostName, port);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param zooKeeper the worker's ZkExt
+ * @param inputSplitPathList path of input splits to read from
+ * @param hostName the worker's host name (for matching)
+ * @param port the port number for this worker
+ */
+ public InputSplitPathOrganizer(
+ final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
+ final String hostName, final int port)
+ throws KeeperException, InterruptedException {
+ this.zooKeeper = zooKeeper;
+ this.pathList = Lists.newArrayList(inputSplitPathList);
+ this.hostName = hostName;
+ this.baseOffset = 0; // set later after switching out local paths
+ prioritizeLocalInputSplits(port);
+ }
+
+ /**
+ * Re-order list of InputSplits so files local to this worker node's
+ * disk are the first it will iterate over when attempting to claim
+ * a split to read. This will increase locality of data reads with greater
+ * probability as the % of total nodes in the cluster hosting data and workers
+ * BOTH increase towards 100%. Replication increases our chances of a "hit."
+ *
+ * @param port the port number for hashing unique iteration indexes for all
+ * workers, even those sharing the same host node.
+ */
+ private void prioritizeLocalInputSplits(final int port) {
+ List<String> sortedList = new ArrayList<String>();
+ String hosts = null;
+ for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
+ final String path = iterator.next();
+ try {
+ hosts = getLocationsFromZkInputSplitData(path);
+ } catch (IOException ioe) {
+ hosts = null; // no problem, just don't sort this entry
+ } catch (KeeperException ke) {
+ hosts = null;
+ } catch (InterruptedException ie) {
+ hosts = null;
+ }
+ if (hosts != null && hosts.contains(hostName)) {
+ sortedList.add(path); // collect the local block
+ iterator.remove(); // remove local block from list
+ }
+ }
+ // shuffle the local blocks in case several workers exist on this host
+ Collections.shuffle(sortedList);
+ // determine the hash-based offset for this worker to iterate from
+ // and place the local blocks into the list at that index, if any
+ final int temp = hostName.hashCode() + (19 * port);
+ if (pathList.size() != 0) {
+ baseOffset = Math.abs(temp % pathList.size());
+ }
+ // re-insert local paths at "adjusted index zero" for caller to iterate on
+ pathList.addAll(baseOffset, sortedList);
+ }
+
+ /**
+ * Utility for extracting locality data from an InputSplit ZNode.
+ *
+ * @param zkSplitPath the input split path to attempt to read
+ * ZNode locality data from for this InputSplit.
+ * @return a String of hostnames from ZNode data, or throws
+ */
+ private String getLocationsFromZkInputSplitData(String zkSplitPath)
+ throws IOException, KeeperException, InterruptedException {
+ byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
+ DataInputStream inputStream =
+ new DataInputStream(new ByteArrayInputStream(locationData));
+ // only read the "first" entry in the znode data, the locations
+ return Text.readString(inputStream);
+ }
+
+ /**
+ * Utility accessor for Input Split znode path list size
+ *
+ * @return the size of <code>this.pathList</code>
+ */
+ public int getPathListSize() {
+ return this.pathList.size();
+ }
+
+ /**
+ * Iterator for the pathList
+ *
+ * @return an iterator for our list of input split paths
+ */
+ public Iterator<String> iterator() {
+ return new PathListIterator();
+ }
+
+ /**
+ * Iterator for path list that handles the locality and hash offsetting.
+ */
+ public class PathListIterator implements Iterator<String> {
+ /** the current iterator index */
+ private int currentIndex = 0;
+
+ /**
+ * Do we have more list to iterate upon?
+ *
+ * @return true if more path strings are available
+ */
+ @Override
+ public boolean hasNext() {
+ return currentIndex < pathList.size();
+ }
+
+ /**
+ * Return the next pathList element
+ *
+ * @return the next input split path
+ */
+ @Override
+ public String next() {
+ return pathList.get((baseOffset + currentIndex++) % pathList.size());
+ }
+
+ /** Just a placeholder; should not do anything! */
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not allowed.");
+ }
+ }
+}