You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2015/10/19 19:14:32 UTC
[3/3] git commit: updated refs/heads/trunk to 5b0cd0e
GIRAPH-1033: Remove zookeeper from input splits handling
Summary: Currently we use zookeeper for handling input splits, by having each worker checking each split, and when a lot of splits are used this becomes very slow. We should have master coordinate input splits allocation instead, making the complexity proportional to #splits instead of #workers*#splits. Master holds all the splits and worker send requests to him asking for splits when they need them.
Test Plan: Run a job with 200 machines and 200k small splits - without this change input superstep takes 30 minutes, and with it less than 2 minutes. Also verified correctness on sample job. mvn clean verify passes.
Differential Revision: https://reviews.facebook.net/D48531
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5b0cd0e0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5b0cd0e0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5b0cd0e0
Branch: refs/heads/trunk
Commit: 5b0cd0e0a2ddbf722b6140d28474295c8376e561
Parents: 47da751
Author: Maja Kabiljo <ma...@fb.com>
Authored: Mon Oct 12 10:56:39 2015 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Oct 19 10:13:43 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/giraph/bsp/BspService.java | 334 +++----------------
.../giraph/bsp/CentralizedServiceMaster.java | 4 +-
.../giraph/bsp/CentralizedServiceWorker.java | 8 +
.../org/apache/giraph/comm/MasterClient.java | 9 +
.../giraph/comm/netty/NettyMasterClient.java | 6 +
.../handler/MasterRequestServerHandler.java | 22 +-
.../comm/requests/AskForInputSplitRequest.java | 76 +++++
.../giraph/comm/requests/MasterRequest.java | 6 +-
.../requests/ReplyWithInputSplitRequest.java | 81 +++++
.../giraph/comm/requests/RequestType.java | 6 +-
.../requests/SendReducedToMasterRequest.java | 6 +-
.../giraph/graph/FinishedSuperstepStats.java | 2 +-
.../apache/giraph/graph/InputSplitEvents.java | 85 -----
.../apache/giraph/graph/InputSplitPaths.java | 88 -----
.../apache/giraph/graph/VertexEdgeCount.java | 20 +-
.../java/org/apache/giraph/io/InputType.java | 31 ++
.../apache/giraph/master/BspServiceMaster.java | 281 +++-------------
.../giraph/master/MasterAggregatorHandler.java | 2 +-
.../giraph/master/MasterGlobalCommHandler.java | 76 +++++
.../giraph/master/MasterGlobalCommUsage.java | 49 +--
.../MasterGlobalCommUsageAggregators.java | 69 ++++
.../input/BasicInputSplitsMasterOrganizer.java | 46 +++
.../input/InputSplitsMasterOrganizer.java | 32 ++
...LocalityAwareInputSplitsMasterOrganizer.java | 125 +++++++
.../MappingInputSplitsMasterOrganizer.java | 64 ++++
.../master/input/MasterInputSplitsHandler.java | 140 ++++++++
.../giraph/master/input/package-info.java | 21 ++
.../apache/giraph/partition/PartitionUtils.java | 2 +-
.../apache/giraph/worker/BspServiceWorker.java | 187 ++---------
.../giraph/worker/EdgeInputSplitsCallable.java | 16 +-
.../worker/EdgeInputSplitsCallableFactory.java | 13 +-
.../giraph/worker/FullInputSplitCallable.java | 210 ------------
.../giraph/worker/InputSplitPathOrganizer.java | 142 --------
.../giraph/worker/InputSplitsCallable.java | 77 ++---
.../giraph/worker/InputSplitsHandler.java | 208 ------------
.../worker/MappingInputSplitsCallable.java | 28 +-
.../MappingInputSplitsCallableFactory.java | 34 +-
.../worker/VertexInputSplitsCallable.java | 16 +-
.../VertexInputSplitsCallableFactory.java | 13 +-
.../giraph/worker/WorkerInputSplitsHandler.java | 108 ++++++
.../java/org/apache/giraph/TestBspBasic.java | 69 ++--
41 files changed, 1164 insertions(+), 1648 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 0a5a7ba..15e4dbe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -21,8 +21,6 @@ package org.apache.giraph.bsp;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.utils.CheckpointingUtils;
@@ -77,59 +75,13 @@ public abstract class BspService<I extends WritableComparable,
/** Master job state znode above base dir */
public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
- /** Mapping input split directory about base dir */
- public static final String MAPPING_INPUT_SPLIT_DIR = "/_mappingInputSplitDir";
- /** Mapping input split done directory about base dir */
- public static final String MAPPING_INPUT_SPLIT_DONE_DIR =
- "/_mappingInputSplitDoneDir";
- /** Denotes a reserved mapping input split */
- public static final String MAPPING_INPUT_SPLIT_RESERVED_NODE =
- "/_mappingInputSplitReserved";
- /** Denotes a finished mapping input split */
- public static final String MAPPING_INPUT_SPLIT_FINISHED_NODE =
- "/_mappingInputSplitFinished";
- /** Denotes that all the mapping input splits are are ready for consumption */
- public static final String MAPPING_INPUT_SPLITS_ALL_READY_NODE =
- "/_mappingInputSplitsAllReady";
- /** Denotes that all the mapping input splits are done. */
- public static final String MAPPING_INPUT_SPLITS_ALL_DONE_NODE =
- "/_mappingInputSplitsAllDone";
-
- /** Vertex input split directory about base dir */
- public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir";
- /** Vertex input split done directory about base dir */
- public static final String VERTEX_INPUT_SPLIT_DONE_DIR =
- "/_vertexInputSplitDoneDir";
- /** Denotes a reserved vertex input split */
- public static final String VERTEX_INPUT_SPLIT_RESERVED_NODE =
- "/_vertexInputSplitReserved";
- /** Denotes a finished vertex input split */
- public static final String VERTEX_INPUT_SPLIT_FINISHED_NODE =
- "/_vertexInputSplitFinished";
- /** Denotes that all the vertex input splits are are ready for consumption */
- public static final String VERTEX_INPUT_SPLITS_ALL_READY_NODE =
- "/_vertexInputSplitsAllReady";
- /** Denotes that all the vertex input splits are done. */
- public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE =
- "/_vertexInputSplitsAllDone";
-
- /** Edge input split directory about base dir */
- public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir";
- /** Edge input split done directory about base dir */
- public static final String EDGE_INPUT_SPLIT_DONE_DIR =
- "/_edgeInputSplitDoneDir";
- /** Denotes a reserved edge input split */
- public static final String EDGE_INPUT_SPLIT_RESERVED_NODE =
- "/_edgeInputSplitReserved";
- /** Denotes a finished edge input split */
- public static final String EDGE_INPUT_SPLIT_FINISHED_NODE =
- "/_edgeInputSplitFinished";
- /** Denotes that all the edge input splits are are ready for consumption */
- public static final String EDGE_INPUT_SPLITS_ALL_READY_NODE =
- "/_edgeInputSplitsAllReady";
- /** Denotes that all the edge input splits are done. */
- public static final String EDGE_INPUT_SPLITS_ALL_DONE_NODE =
- "/_edgeInputSplitsAllDone";
+ /** Input splits worker done directory */
+ public static final String INPUT_SPLITS_WORKER_DONE_DIR =
+ "/_inputSplitsWorkerDoneDir";
+ /** Input splits all done node*/
+ public static final String INPUT_SPLITS_ALL_DONE_NODE =
+ "/_inputSplitsAllDone";
+
/** Directory of attempts of this application */
public static final String APPLICATION_ATTEMPTS_DIR =
"/_applicationAttemptsDir";
@@ -192,18 +144,10 @@ public abstract class BspService<I extends WritableComparable,
protected final String basePath;
/** Path to the job state determined by the master (informative only) */
protected final String masterJobStatePath;
- /** ZooKeeper paths for mapping input splits. */
- protected final InputSplitPaths mappingInputSplitsPaths;
- /** ZooKeeper paths for vertex input splits. */
- protected final InputSplitPaths vertexInputSplitsPaths;
- /** ZooKeeper paths for edge input splits. */
- protected final InputSplitPaths edgeInputSplitsPaths;
- /** Mapping input splits events */
- protected final InputSplitEvents mappingInputSplitsEvents;
- /** Vertex input split events. */
- protected final InputSplitEvents vertexInputSplitsEvents;
- /** Edge input split events. */
- protected final InputSplitEvents edgeInputSplitsEvents;
+ /** Input splits worker done directory */
+ protected final String inputSplitsWorkerDonePath;
+ /** Input splits all done node */
+ protected final String inputSplitsAllDonePath;
/** Path to the application attempts) */
protected final String applicationAttemptsPath;
/** Path to the cleaned up notifications */
@@ -226,6 +170,10 @@ public abstract class BspService<I extends WritableComparable,
private final BspEvent addressesAndPartitionsReadyChanged;
/** Application attempt changed */
private final BspEvent applicationAttemptChanged;
+ /** Input splits worker done */
+ private final BspEvent inputSplitsWorkerDoneEvent;
+ /** Input splits all done */
+ private final BspEvent inputSplitsAllDoneEvent;
/** Superstep finished synchronization */
private final BspEvent superstepFinished;
/** Master election changed for any waited on attempt */
@@ -269,23 +217,20 @@ public abstract class BspService<I extends WritableComparable,
public BspService(
Mapper<?, ?, ?, ?>.Context context,
GraphTaskManager<I, V, E> graphTaskManager) {
- this.mappingInputSplitsEvents = new InputSplitEvents(context);
- this.vertexInputSplitsEvents = new InputSplitEvents(context);
- this.edgeInputSplitsEvents = new InputSplitEvents(context);
this.connectedEvent = new PredicateLock(context);
this.workerHealthRegistrationChanged = new PredicateLock(context);
this.addressesAndPartitionsReadyChanged = new PredicateLock(context);
this.applicationAttemptChanged = new PredicateLock(context);
+ this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
+ this.inputSplitsAllDoneEvent = new PredicateLock(context);
this.superstepFinished = new PredicateLock(context);
this.masterElectionChildrenChanged = new PredicateLock(context);
this.cleanedUpChildrenChanged = new PredicateLock(context);
registerBspEvent(connectedEvent);
registerBspEvent(workerHealthRegistrationChanged);
- registerBspEvent(vertexInputSplitsEvents.getAllReadyChanged());
- registerBspEvent(vertexInputSplitsEvents.getStateChanged());
- registerBspEvent(edgeInputSplitsEvents.getAllReadyChanged());
- registerBspEvent(edgeInputSplitsEvents.getStateChanged());
+ registerBspEvent(inputSplitsWorkerDoneEvent);
+ registerBspEvent(inputSplitsAllDoneEvent);
registerBspEvent(addressesAndPartitionsReadyChanged);
registerBspEvent(applicationAttemptChanged);
registerBspEvent(superstepFinished);
@@ -311,16 +256,8 @@ public abstract class BspService<I extends WritableComparable,
getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
basePath);
masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
- mappingInputSplitsPaths = new InputSplitPaths(basePath,
- MAPPING_INPUT_SPLIT_DIR, MAPPING_INPUT_SPLIT_DONE_DIR,
- MAPPING_INPUT_SPLITS_ALL_READY_NODE,
- MAPPING_INPUT_SPLITS_ALL_DONE_NODE);
- vertexInputSplitsPaths = new InputSplitPaths(basePath,
- VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
- VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE);
- edgeInputSplitsPaths = new InputSplitPaths(basePath,
- EDGE_INPUT_SPLIT_DIR, EDGE_INPUT_SPLIT_DONE_DIR,
- EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
+ inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
+ inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
@@ -433,24 +370,6 @@ public abstract class BspService<I extends WritableComparable,
}
/**
- * Get the input split events for edge input.
- *
- * @return InputSplitEvents for edge input.
- */
- public InputSplitEvents getEdgeInputSplitsEvents() {
- return edgeInputSplitsEvents;
- }
-
- /**
- * Get the input split events for vertex input.
- *
- * @return InputSplitEvents for vertex input.
- */
- public InputSplitEvents getVertexInputSplitsEvents() {
- return vertexInputSplitsEvents;
- }
-
- /**
* Generate the worker information "healthy" directory path for a
* superstep
*
@@ -655,6 +574,14 @@ public abstract class BspService<I extends WritableComparable,
return applicationAttemptChanged;
}
+ public final BspEvent getInputSplitsWorkerDoneEvent() {
+ return inputSplitsWorkerDoneEvent;
+ }
+
+ public final BspEvent getInputSplitsAllDoneEvent() {
+ return inputSplitsAllDoneEvent;
+ }
+
public final BspEvent getSuperstepFinishedEvent() {
return superstepFinished;
}
@@ -952,9 +879,20 @@ public abstract class BspService<I extends WritableComparable,
}
workerHealthRegistrationChanged.signal();
eventProcessed = true;
- } else if (processMappingEvent(event) || processVertexEvent(event) ||
- processEdgeEvent(event)) {
- return;
+ } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) &&
+ event.getType() == EventType.NodeCreated) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: all input splits done");
+ }
+ inputSplitsAllDoneEvent.signal();
+ eventProcessed = true;
+ } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) &&
+ event.getType() == EventType.NodeChildrenChanged) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("process: worker done reading input splits");
+ }
+ inputSplitsWorkerDoneEvent.signal();
+ eventProcessed = true;
} else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
event.getType() == EventType.NodeCreated) {
if (LOG.isInfoEnabled()) {
@@ -1001,192 +939,6 @@ public abstract class BspService<I extends WritableComparable,
}
/**
- * Process WatchedEvent for Mapping Inputsplits
- *
- * @param event watched event
- * @return true if event processed
- */
- public final boolean processMappingEvent(WatchedEvent event) {
- boolean eventProcessed = false;
- if (event.getPath().equals(
- mappingInputSplitsPaths.getAllReadyPath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: inputSplitsReadyChanged " +
- "(input splits ready)");
- }
- mappingInputSplitsEvents.getAllReadyChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: mappingInputSplitsStateChanged " +
- "(made a reservation)");
- }
- mappingInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeDeleted)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: mappingInputSplitsStateChanged " +
- "(lost a reservation)");
- }
- mappingInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_FINISHED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: mappingInputSplitsStateChanged " +
- "(finished inputsplit)");
- }
- mappingInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_DONE_DIR) &&
- (event.getType() == EventType.NodeChildrenChanged)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: mappingInputSplitsDoneStateChanged " +
- "(worker finished sending)");
- }
- mappingInputSplitsEvents.getDoneStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().equals(
- mappingInputSplitsPaths.getAllDonePath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: mappingInputSplitsAllDoneChanged " +
- "(all entries sent from input splits)");
- }
- mappingInputSplitsEvents.getAllDoneChanged().signal();
- eventProcessed = true;
- }
- return eventProcessed;
- }
-
- /**
- * Process WatchedEvent for Vertex Inputsplits
- *
- * @param event watched event
- * @return true if event processed
- */
- public final boolean processVertexEvent(WatchedEvent event) {
- boolean eventProcessed = false;
- if (event.getPath().equals(
- vertexInputSplitsPaths.getAllReadyPath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: inputSplitsReadyChanged " +
- "(input splits ready)");
- }
- vertexInputSplitsEvents.getAllReadyChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: vertexInputSplitsStateChanged " +
- "(made a reservation)");
- }
- vertexInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeDeleted)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: vertexInputSplitsStateChanged " +
- "(lost a reservation)");
- }
- vertexInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_FINISHED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: vertexInputSplitsStateChanged " +
- "(finished inputsplit)");
- }
- vertexInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_DONE_DIR) &&
- (event.getType() == EventType.NodeChildrenChanged)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: vertexInputSplitsDoneStateChanged " +
- "(worker finished sending)");
- }
- vertexInputSplitsEvents.getDoneStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().equals(
- vertexInputSplitsPaths.getAllDonePath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: vertexInputSplitsAllDoneChanged " +
- "(all vertices sent from input splits)");
- }
- vertexInputSplitsEvents.getAllDoneChanged().signal();
- eventProcessed = true;
- }
- return eventProcessed;
- }
-
- /**
- * Process WatchedEvent for Edge Inputsplits
- *
- * @param event watched event
- * @return true if event processed
- */
- public final boolean processEdgeEvent(WatchedEvent event) {
- boolean eventProcessed = false;
- if (event.getPath().equals(
- edgeInputSplitsPaths.getAllReadyPath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: edgeInputSplitsReadyChanged " +
- "(input splits ready)");
- }
- edgeInputSplitsEvents.getAllReadyChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: edgeInputSplitsStateChanged " +
- "(made a reservation)");
- }
- edgeInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeDeleted)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: edgeInputSplitsStateChanged " +
- "(lost a reservation)");
- }
- edgeInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_FINISHED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: edgeInputSplitsStateChanged " +
- "(finished inputsplit)");
- }
- edgeInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_DONE_DIR) &&
- (event.getType() == EventType.NodeChildrenChanged)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: edgeInputSplitsDoneStateChanged " +
- "(worker finished sending)");
- }
- edgeInputSplitsEvents.getDoneStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().equals(
- edgeInputSplitsPaths.getAllDonePath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: edgeInputSplitsAllDoneChanged " +
- "(all edges sent from input splits)");
- }
- edgeInputSplitsEvents.getAllDoneChanged().signal();
- eventProcessed = true;
- }
- return eventProcessed;
- }
-
- /**
* Get the last saved superstep.
*
* @return Last good superstep number
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 1e8d519..f05a79d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -22,8 +22,8 @@ import java.io.IOException;
import java.util.List;
import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
-import org.apache.giraph.master.MasterAggregatorHandler;
import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterGlobalCommHandler;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
@@ -144,7 +144,7 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
*
* @return Global communication handler
*/
- MasterAggregatorHandler getGlobalCommHandler();
+ MasterGlobalCommHandler getGlobalCommHandler();
/**
* Handler for aggregators to reduce/broadcast translation
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index f6d77d0..94cd265 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -30,6 +30,7 @@ import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.worker.WorkerInputSplitsHandler;
import org.apache.giraph.worker.WorkerAggregatorHandler;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerInfo;
@@ -252,4 +253,11 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
* @return number of partitions owned
*/
int getNumPartitionsOwned();
+
+ /**
+ * Get input splits handler used during input
+ *
+ * @return Input splits handler
+ */
+ WorkerInputSplitsHandler getInputSplitsHandler();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
index aea93fd..244dd74 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm;
import java.io.IOException;
+import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.hadoop.io.Writable;
/**
@@ -54,6 +55,14 @@ public interface MasterClient {
void flush();
/**
+ * Send a request to a remote server (should be already connected)
+ *
+ * @param destTaskId Destination worker id
+ * @param request Request to send
+ */
+ void sendWritableRequest(int destTaskId, WritableRequest request);
+
+ /**
* Closes all connections.
*/
void closeConnections();
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index e110782..9b348e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -26,6 +26,7 @@ import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
@@ -117,6 +118,11 @@ public class NettyMasterClient implements MasterClient {
}
@Override
+ public void sendWritableRequest(int destTaskId, WritableRequest request) {
+ nettyClient.sendWritableRequest(destTaskId, request);
+ }
+
+ @Override
public void closeConnections() {
nettyClient.stop();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index 02c72f7..9aa88ae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -21,13 +21,13 @@ package org.apache.giraph.comm.netty.handler;
import org.apache.giraph.comm.requests.MasterRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
/** Handler for requests on master */
public class MasterRequestServerHandler extends
RequestServerHandler<MasterRequest> {
/** Aggregator handler */
- private final MasterAggregatorHandler aggregatorHandler;
+ private final MasterGlobalCommHandler commHandler;
/**
* Constructor
@@ -35,22 +35,22 @@ public class MasterRequestServerHandler extends
* @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration
* @param myTaskInfo Current task info
- * @param aggregatorHandler Master aggregator handler
+ * @param commHandler Master communication handler
* @param exceptionHandler Handles uncaught exceptions
*/
public MasterRequestServerHandler(
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
TaskInfo myTaskInfo,
- MasterAggregatorHandler aggregatorHandler,
+ MasterGlobalCommHandler commHandler,
Thread.UncaughtExceptionHandler exceptionHandler) {
super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
- this.aggregatorHandler = aggregatorHandler;
+ this.commHandler = commHandler;
}
@Override
public void processRequest(MasterRequest request) {
- request.doRequest(aggregatorHandler);
+ request.doRequest(commHandler);
}
/**
@@ -58,15 +58,15 @@ public class MasterRequestServerHandler extends
*/
public static class Factory implements RequestServerHandler.Factory {
/** Master aggregator handler */
- private final MasterAggregatorHandler aggregatorHandler;
+ private final MasterGlobalCommHandler commHandler;
/**
* Constructor
*
- * @param aggregatorHandler Master aggregator handler
+ * @param commHandler Master global communication handler
*/
- public Factory(MasterAggregatorHandler aggregatorHandler) {
- this.aggregatorHandler = aggregatorHandler;
+ public Factory(MasterGlobalCommHandler commHandler) {
+ this.commHandler = commHandler;
}
@Override
@@ -76,7 +76,7 @@ public class MasterRequestServerHandler extends
TaskInfo myTaskInfo,
Thread.UncaughtExceptionHandler exceptionHandler) {
return new MasterRequestServerHandler(workerRequestReservedMap, conf,
- myTaskInfo, aggregatorHandler, exceptionHandler);
+ myTaskInfo, commHandler, exceptionHandler);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
new file mode 100644
index 0000000..5d9e4e6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.master.MasterGlobalCommHandler;
+import org.apache.giraph.io.InputType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A request which workers will send to master to ask it to give them splits
+ */
+public class AskForInputSplitRequest extends WritableRequest
+ implements MasterRequest {
+ /** Type of split we are requesting */
+ private InputType splitType;
+ /** Task id of worker which requested the split */
+ private int workerTaskId;
+
+ /**
+ * Constructor
+ *
+ * @param splitType Type of split we are requesting
+ * @param workerTaskId Task id of worker which requested the split
+ */
+ public AskForInputSplitRequest(InputType splitType, int workerTaskId) {
+ this.splitType = splitType;
+ this.workerTaskId = workerTaskId;
+ }
+
+ /**
+ * Constructor used for reflection only
+ */
+ public AskForInputSplitRequest() {
+ }
+
+ @Override
+ public void doRequest(MasterGlobalCommHandler commHandler) {
+ commHandler.getInputSplitsHandler().sendSplitTo(splitType, workerTaskId);
+ }
+
+ @Override
+ void readFieldsRequest(DataInput in) throws IOException {
+ splitType = InputType.values()[in.readInt()];
+ workerTaskId = in.readInt();
+ }
+
+ @Override
+ void writeRequest(DataOutput out) throws IOException {
+ out.writeInt(splitType.ordinal());
+ out.writeInt(workerTaskId);
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.ASK_FOR_INPUT_SPLIT_REQUEST;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
index 7fedcc5..43632b0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
@@ -18,7 +18,7 @@
package org.apache.giraph.comm.requests;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
/**
* Interface for requests sent to master to extend
@@ -27,7 +27,7 @@ public interface MasterRequest {
/**
* Execute the request
*
- * @param aggregatorHandler Master aggregator handler
+ * @param commHandler Master communication handler
*/
- void doRequest(MasterAggregatorHandler aggregatorHandler);
+ void doRequest(MasterGlobalCommHandler commHandler);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
new file mode 100644
index 0000000..6b50562
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.io.InputType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A request which master will send to workers to give them splits
+ */
+public class ReplyWithInputSplitRequest extends WritableRequest
+ implements WorkerRequest {
+ /** Type of input split */
+ private InputType splitType;
+ /** Serialized input split */
+ private byte[] serializedInputSplit;
+
+ /**
+ * Constructor
+ *
+ * @param splitType Type of input split
+ * @param serializedInputSplit Serialized input split
+ */
+ public ReplyWithInputSplitRequest(InputType splitType,
+ byte[] serializedInputSplit) {
+ this.splitType = splitType;
+ this.serializedInputSplit = serializedInputSplit;
+ }
+
+ /**
+ * Constructor used for reflection only
+ */
+ public ReplyWithInputSplitRequest() {
+ }
+
+ @Override
+ void readFieldsRequest(DataInput in) throws IOException {
+ splitType = InputType.values()[in.readInt()];
+ int size = in.readInt();
+ serializedInputSplit = new byte[size];
+ in.readFully(serializedInputSplit);
+ }
+
+ @Override
+ void writeRequest(DataOutput out) throws IOException {
+ out.writeInt(splitType.ordinal());
+ out.writeInt(serializedInputSplit.length);
+ out.write(serializedInputSplit);
+ }
+
+ @Override
+ public void doRequest(ServerData serverData) {
+ serverData.getServiceWorker().getInputSplitsHandler().receivedInputSplit(
+ splitType, serializedInputSplit);
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.REPLY_WITH_INPUT_SPLIT_REQUEST;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index 343a2de..bebac28 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -60,7 +60,11 @@ else[HADOOP_NON_SECURE]*/
/** Send aggregators from worker owner to other workers */
SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class),
/** Send message from worker to worker */
- SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class);
+ SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class),
+ /** Send request for input split from worker to master */
+ ASK_FOR_INPUT_SPLIT_REQUEST(AskForInputSplitRequest.class),
+ /** Send request with granted input split from master to workers */
+ REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class);
/** Class of request which this type corresponds to */
private final Class<? extends WritableRequest> requestClass;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
index 7171f04..3a1bd64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
@@ -20,7 +20,7 @@ package org.apache.giraph.comm.requests;
import java.io.IOException;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
/**
* Request to send final aggregated values from worker which owns
@@ -45,9 +45,9 @@ public class SendReducedToMasterRequest extends ByteArrayRequest
}
@Override
- public void doRequest(MasterAggregatorHandler aggregatorHandler) {
+ public void doRequest(MasterGlobalCommHandler commHandler) {
try {
- aggregatorHandler.acceptReducedValues(getDataInput());
+ commHandler.getAggregatorHandler().acceptReducedValues(getDataInput());
} catch (IOException e) {
throw new IllegalStateException("doRequest: " +
"IOException occurred while processing request", e);
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
index cfb9799..c53b34f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
@@ -51,7 +51,7 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
long numEdges,
boolean mustLoadCheckpoint,
CheckpointStatus checkpointStatus) {
- super(numVertices, numEdges);
+ super(numVertices, numEdges, 0);
this.localVertexCount = numLocalVertices;
this.allVerticesHalted = allVerticesHalted;
this.mustLoadCheckpoint = mustLoadCheckpoint;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
deleted file mode 100644
index 23be1c4..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * Simple container of input split events.
- */
-public class InputSplitEvents {
- /** Input splits are ready for consumption by workers */
- private final BspEvent allReadyChanged;
- /** Input split reservation or finished notification and synchronization */
- private final BspEvent stateChanged;
- /** Input splits are done being processed by workers */
- private final BspEvent allDoneChanged;
- /** Input split done by a worker finished notification and synchronization */
- private final BspEvent doneStateChanged;
-
- /**
- * Constructor.
- *
- * @param progressable {@link Progressable} to report progress
- */
- public InputSplitEvents(Progressable progressable) {
- allReadyChanged = new PredicateLock(progressable);
- stateChanged = new PredicateLock(progressable);
- allDoneChanged = new PredicateLock(progressable);
- doneStateChanged = new PredicateLock(progressable);
- }
-
- /**
- * Get event for input splits all ready
- *
- * @return {@link BspEvent} for input splits all ready
- */
- public BspEvent getAllReadyChanged() {
- return allReadyChanged;
- }
-
- /**
- * Get event for input splits state
- *
- * @return {@link BspEvent} for input splits state
- */
- public BspEvent getStateChanged() {
- return stateChanged;
- }
-
- /**
- * Get event for input splits all done
- *
- * @return {@link BspEvent} for input splits all done
- */
- public BspEvent getAllDoneChanged() {
- return allDoneChanged;
- }
-
- /**
- * Get event for input split done
- *
- * @return {@link BspEvent} for input split done
- */
- public BspEvent getDoneStateChanged() {
- return doneStateChanged;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
deleted file mode 100644
index 4cf005e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * Simple container of input split paths for coordination via ZooKeeper.
- */
-public class InputSplitPaths {
- /** Path to the input splits written by the master */
- private final String path;
- /** Path to the input splits all ready to be processed by workers */
- private final String allReadyPath;
- /** Path to the input splits done */
- private final String donePath;
- /** Path to the input splits all done to notify the workers to proceed */
- private final String allDonePath;
-
- /**
- * Constructor.
- *
- * @param basePath Base path
- * @param dir Input splits path
- * @param doneDir Input split done path
- * @param allReadyNode Input splits all ready path
- * @param allDoneNode Input splits all done path
- */
- public InputSplitPaths(String basePath,
- String dir,
- String doneDir,
- String allReadyNode,
- String allDoneNode) {
- path = basePath + dir;
- allReadyPath = basePath + allReadyNode;
- donePath = basePath + doneDir;
- allDonePath = basePath + allDoneNode;
- }
-
- /**
- * Get path to the input splits.
- *
- * @return Path to input splits
- */
- public String getPath() {
- return path;
- }
-
- /**
- * Get path to the input splits all ready.
- *
- * @return Path to input splits all ready
- */
- public String getAllReadyPath() {
- return allReadyPath;
- }
-
- /** Get path to the input splits done.
- *
- * @return Path to input splits done
- */
- public String getDonePath() {
- return donePath;
- }
-
- /**
- * Get path to the input splits all done.
- *
- * @return Path to input splits all done
- */
- public String getAllDonePath() {
- return allDonePath;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
index c2d13cc..1c871f0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
@@ -26,6 +26,8 @@ public class VertexEdgeCount {
private final long vertexCount;
/** Immutable edges */
private final long edgeCount;
+ /** Immutable mappings */
+ private final long mappingCount;
/**
* Default constructor.
@@ -33,6 +35,7 @@ public class VertexEdgeCount {
public VertexEdgeCount() {
vertexCount = 0;
edgeCount = 0;
+ mappingCount = 0;
}
/**
@@ -40,10 +43,12 @@ public class VertexEdgeCount {
*
* @param vertexCount Final number of vertices.
* @param edgeCount Final number of edges.
+ * @param mappingCount Final number of mappings.
*/
- public VertexEdgeCount(long vertexCount, long edgeCount) {
+ public VertexEdgeCount(long vertexCount, long edgeCount, long mappingCount) {
this.vertexCount = vertexCount;
this.edgeCount = edgeCount;
+ this.mappingCount = mappingCount;
}
public long getVertexCount() {
@@ -54,6 +59,10 @@ public class VertexEdgeCount {
return edgeCount;
}
+ public long getMappingCount() {
+ return mappingCount;
+ }
+
/**
* Increment the both the vertex edge count with a {@link VertexEdgeCount}.
*
@@ -64,7 +73,8 @@ public class VertexEdgeCount {
VertexEdgeCount vertexEdgeCount) {
return new VertexEdgeCount(
vertexCount + vertexEdgeCount.getVertexCount(),
- edgeCount + vertexEdgeCount.getEdgeCount());
+ edgeCount + vertexEdgeCount.getEdgeCount(),
+ mappingCount + vertexEdgeCount.getMappingCount());
}
/**
@@ -78,11 +88,13 @@ public class VertexEdgeCount {
long vertexCount, long edgeCount) {
return new VertexEdgeCount(
this.vertexCount + vertexCount,
- this.edgeCount + edgeCount);
+ this.edgeCount + edgeCount,
+ this.mappingCount + mappingCount);
}
@Override
public String toString() {
- return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")";
+ return "(v=" + getVertexCount() + ", e=" + getEdgeCount() +
+ (mappingCount > 0 ? ", m=" + mappingCount : "") + ")";
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/InputType.java b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
new file mode 100644
index 0000000..26ee966
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+/**
+ * Type of input
+ */
+public enum InputType {
+ /** Vertex input */
+ VERTEX,
+ /** Edge input */
+ EDGE,
+ /** Mapping input */
+ MAPPING
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 0b56a4f..0e7bb9d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -21,12 +21,8 @@ package org.apache.giraph.master;
import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
-import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.Charset;
@@ -38,9 +34,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.iharder.Base64;
@@ -66,12 +59,12 @@ import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphFunctions;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.InputType;
+import org.apache.giraph.master.input.MasterInputSplitsHandler;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphTimer;
@@ -88,8 +81,6 @@ import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReactiveJMapHistoDumper;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.WritableUtils;
@@ -99,7 +90,6 @@ import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobID;
@@ -170,7 +160,7 @@ public class BspServiceMaster<I extends WritableComparable,
private final List<PartitionStats> allPartitionStatsList =
new ArrayList<PartitionStats>();
/** Handler for global communication */
- private MasterAggregatorHandler globalCommHandler;
+ private MasterGlobalCommHandler globalCommHandler;
/** Handler for aggregators to reduce/broadcast translation */
private AggregatorToGlobalCommTranslation aggregatorTranslation;
/** Master class */
@@ -331,7 +321,7 @@ public class BspServiceMaster<I extends WritableComparable,
*/
private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
int minSplitCountHint,
- String inputSplitType) {
+ InputType inputSplitType) {
String logPrefix = "generate" + inputSplitType + "InputSplits";
List<InputSplit> splits;
try {
@@ -604,46 +594,25 @@ public class BspServiceMaster<I extends WritableComparable,
* Common method for creating vertex/edge input splits.
*
* @param inputFormat The vertex/edge input format
- * @param inputSplitPaths ZooKeeper input split paths
* @param inputSplitType Type of input split (for logging purposes)
* @return Number of splits. Returns -1 on failure to create
* valid input splits.
*/
private int createInputSplits(GiraphInputFormat inputFormat,
- InputSplitPaths inputSplitPaths,
- String inputSplitType) {
+ InputType inputSplitType) {
ImmutableClassesGiraphConfiguration conf = getConfiguration();
String logPrefix = "create" + inputSplitType + "InputSplits";
// Only the 'master' should be doing this. Wait until the number of
// processes that have reported health exceeds the minimum percentage.
// If the minimum percentage is not met, fail the job. Otherwise
// generate the input splits
- String inputSplitsPath = inputSplitPaths.getPath();
- try {
- if (getZkExt().exists(inputSplitsPath, false) != null) {
- LOG.info(inputSplitsPath + " already exists, no need to create");
- return Integer.parseInt(
- new String(getZkExt().getData(inputSplitsPath, false, null),
- Charset.defaultCharset()));
- }
- } catch (KeeperException.NoNodeException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Need to create the input splits at " +
- inputSplitsPath);
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(logPrefix + ": KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ": InterruptedException", e);
- }
-
- // When creating znodes, in case the master has already run, resume
- // where it left off.
List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
if (healthyWorkerInfoList == null) {
setJobStateFailed("Not enough healthy workers to create input splits");
return -1;
}
+ globalCommHandler.getInputSplitsHandler().initialize(masterClient,
+ healthyWorkerInfoList);
// Create at least as many splits as the total number of input threads.
int minSplitCountHint = healthyWorkerInfoList.size() *
@@ -671,54 +640,8 @@ public class BspServiceMaster<I extends WritableComparable,
"some threads will be not used");
}
- // Write input splits to zookeeper in parallel
- int inputSplitThreadCount = conf.getInt(NUM_MASTER_ZK_INPUT_SPLIT_THREADS,
- DEFAULT_INPUT_SPLIT_THREAD_COUNT);
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Starting to write input split data " +
- "to zookeeper with " + inputSplitThreadCount + " threads");
- }
- try {
- getZkExt().createExt(inputSplitsPath, null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- false);
- } catch (KeeperException e) {
- LOG.info(logPrefix + ": Node " +
- inputSplitsPath + " keeper exception " + e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ' ' + e.getMessage(), e);
- }
- ExecutorService taskExecutor =
- Executors.newFixedThreadPool(inputSplitThreadCount);
- boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf);
- for (int i = 0; i < splitList.size(); ++i) {
- InputSplit inputSplit = splitList.get(i);
- taskExecutor.submit(new LogStacktraceCallable<Void>(
- new WriteInputSplit(inputFormat, inputSplit, inputSplitsPath, i,
- writeLocations)));
- }
- taskExecutor.shutdown();
- ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Done writing input split data to zookeeper");
- }
-
- // Let workers know they can start trying to load the input splits
- try {
- getZkExt().createExt(inputSplitPaths.getAllReadyPath(),
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- false);
- } catch (KeeperException.NodeExistsException e) {
- LOG.info(logPrefix + ": Node " +
- inputSplitPaths.getAllReadyPath() + " already exists.");
- } catch (KeeperException e) {
- throw new IllegalStateException(logPrefix + ": KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
- }
+ globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
+ splitList, inputFormat);
return splitList.size();
}
@@ -730,8 +653,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
getConfiguration().createWrappedMappingInputFormat();
- return createInputSplits(mappingInputFormat, mappingInputSplitsPaths,
- "Mapping");
+ return createInputSplits(mappingInputFormat, InputType.MAPPING);
}
@Override
@@ -742,8 +664,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
VertexInputFormat<I, V, E> vertexInputFormat =
getConfiguration().createWrappedVertexInputFormat();
- return createInputSplits(vertexInputFormat, vertexInputSplitsPaths,
- "Vertex");
+ return createInputSplits(vertexInputFormat, InputType.VERTEX);
}
@Override
@@ -754,8 +675,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
EdgeInputFormat<I, E> edgeInputFormat =
getConfiguration().createWrappedEdgeInputFormat();
- return createInputSplits(edgeInputFormat, edgeInputSplitsPaths,
- "Edge");
+ return createInputSplits(edgeInputFormat, InputType.EDGE);
}
@Override
@@ -764,7 +684,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
@Override
- public MasterAggregatorHandler getGlobalCommHandler() {
+ public MasterGlobalCommHandler getGlobalCommHandler() {
return globalCommHandler;
}
@@ -838,7 +758,7 @@ public class BspServiceMaster<I extends WritableComparable,
});
- globalCommHandler.readFields(finalizedStream);
+ globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
aggregatorTranslation.readFields(finalizedStream);
masterCompute.readFields(finalizedStream);
finalizedStream.close();
@@ -911,12 +831,15 @@ public class BspServiceMaster<I extends WritableComparable,
if (masterChildArr.get(0).equals(myBid)) {
GiraphStats.getInstance().getCurrentMasterTaskPartition().
setValue(getTaskPartition());
- globalCommHandler = new MasterAggregatorHandler(
- getConfiguration(), getContext());
+
+ globalCommHandler = new MasterGlobalCommHandler(
+ new MasterAggregatorHandler(getConfiguration(), getContext()),
+ new MasterInputSplitsHandler(
+ getConfiguration().useInputSplitLocality()));
aggregatorTranslation = new AggregatorToGlobalCommTranslation(
getConfiguration(), globalCommHandler);
- globalCommHandler.initialize(this);
+ globalCommHandler.getAggregatorHandler().initialize(this);
masterCompute = getConfiguration().createMasterCompute();
masterCompute.setMasterService(this);
@@ -1128,7 +1051,7 @@ public class BspServiceMaster<I extends WritableComparable,
for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
}
- globalCommHandler.write(finalizedOutputStream);
+ globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
aggregatorTranslation.write(finalizedOutputStream);
masterCompute.write(finalizedOutputStream);
finalizedOutputStream.close();
@@ -1265,12 +1188,8 @@ public class BspServiceMaster<I extends WritableComparable,
@Override
public void restartFromCheckpoint(long checkpoint) {
// Process:
- // 1. Remove all old input split data
- // 2. Increase the application attempt and set to the correct checkpoint
- // 3. Send command to all workers to restart their tasks
- zkDeleteNode(vertexInputSplitsPaths.getPath());
- zkDeleteNode(edgeInputSplitsPaths.getPath());
-
+ // 1. Increase the application attempt and set to the correct checkpoint
+ // 2. Send command to all workers to restart their tasks
setApplicationAttempt(getApplicationAttempt() + 1);
setCachedSuperstep(checkpoint);
setRestartedSuperstep(checkpoint);
@@ -1493,37 +1412,32 @@ public class BspServiceMaster<I extends WritableComparable,
/**
* Coordinate the exchange of vertex/edge input splits among workers.
- *
- * @param inputSplitPaths Input split paths
- * @param inputSplitEvents Input split events
- * @param inputSplitsType Type of input splits (for logging purposes)
*/
- private void coordinateInputSplits(InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents,
- String inputSplitsType) {
+ private void coordinateInputSplits() {
// Coordinate the workers finishing sending their vertices/edges to the
// correct workers and signal when everything is done.
- String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
- if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
+ if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
chosenWorkerInfoList,
- inputSplitEvents.getDoneStateChanged(),
+ getInputSplitsWorkerDoneEvent(),
false)) {
- throw new IllegalStateException(logPrefix + ": Worker failed during " +
- "input split (currently not supported)");
+ throw new IllegalStateException("coordinateInputSplits: Worker failed " +
+ "during input split (currently not supported)");
}
try {
- getZkExt().createExt(inputSplitPaths.getAllDonePath(),
+ getZkExt().createExt(inputSplitsAllDonePath,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
false);
} catch (KeeperException.NodeExistsException e) {
LOG.info("coordinateInputSplits: Node " +
- inputSplitPaths.getAllDonePath() + " already exists.");
+ inputSplitsAllDonePath + " already exists.");
} catch (KeeperException e) {
- throw new IllegalStateException(logPrefix + ": KeeperException", e);
+ throw new IllegalStateException(
+ "coordinateInputSplits: KeeperException", e);
} catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
+ throw new IllegalStateException(
+ "coordinateInputSplits: IllegalStateException", e);
}
}
@@ -1543,7 +1457,7 @@ public class BspServiceMaster<I extends WritableComparable,
*/
private void initializeAggregatorInputSuperstep()
throws InterruptedException {
- globalCommHandler.prepareSuperstep();
+ globalCommHandler.getAggregatorHandler().prepareSuperstep();
prepareMasterCompute(getSuperstep());
try {
@@ -1559,9 +1473,9 @@ public class BspServiceMaster<I extends WritableComparable,
"initializeAggregatorInputSuperstep: Failed in access", e);
}
aggregatorTranslation.postMasterCompute();
- globalCommHandler.finishSuperstep();
+ globalCommHandler.getAggregatorHandler().finishSuperstep();
- globalCommHandler.sendDataToOwners(masterClient);
+ globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
}
/**
@@ -1627,7 +1541,7 @@ public class BspServiceMaster<I extends WritableComparable,
// We need to finalize aggregators from previous superstep
if (getSuperstep() >= 0) {
aggregatorTranslation.postMasterCompute();
- globalCommHandler.finishSuperstep();
+ globalCommHandler.getAggregatorHandler().finishSuperstep();
}
masterClient.openConnections();
@@ -1663,25 +1577,13 @@ public class BspServiceMaster<I extends WritableComparable,
// We need to send aggregators to worker owners after new worker assignments
if (getSuperstep() >= 0) {
- globalCommHandler.sendDataToOwners(masterClient);
+ globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
}
if (getSuperstep() == INPUT_SUPERSTEP) {
// Initialize aggregators before coordinating
initializeAggregatorInputSuperstep();
- if (getConfiguration().hasMappingInputFormat()) {
- coordinateInputSplits(mappingInputSplitsPaths, mappingInputSplitsEvents,
- "Mapping");
- }
- // vertex loading and edge loading
- if (getConfiguration().hasVertexInputFormat()) {
- coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
- "Vertex");
- }
- if (getConfiguration().hasEdgeInputFormat()) {
- coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents,
- "Edge");
- }
+ coordinateInputSplits();
}
String finishedWorkerPath =
@@ -1695,7 +1597,7 @@ public class BspServiceMaster<I extends WritableComparable,
// Collect aggregator values, then run the master.compute() and
// finally save the aggregator values
- globalCommHandler.prepareSuperstep();
+ globalCommHandler.getAggregatorHandler().prepareSuperstep();
aggregatorTranslation.prepareSuperstep();
SuperstepClasses superstepClasses =
@@ -1761,7 +1663,8 @@ public class BspServiceMaster<I extends WritableComparable,
} else {
superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
}
- globalCommHandler.writeAggregators(getSuperstep(), superstepState);
+ globalCommHandler.getAggregatorHandler().writeAggregators(
+ getSuperstep(), superstepState);
return superstepState;
}
@@ -2009,7 +1912,7 @@ public class BspServiceMaster<I extends WritableComparable,
failJob(new Exception("Checkpoint and halt requested. " +
"Killing this job."));
}
- globalCommHandler.close();
+ globalCommHandler.getAggregatorHandler().close();
masterClient.closeConnections();
masterServer.close();
}
@@ -2122,100 +2025,4 @@ public class BspServiceMaster<I extends WritableComparable,
gs.getAggregateSentMessageBytes()
.increment(globalStats.getMessageBytesCount());
}
-
- /**
- * Task that writes a given input split to zookeeper.
- * Upon failure call() throws an exception.
- */
- private class WriteInputSplit implements Callable<Void> {
- /** Input format */
- private final GiraphInputFormat inputFormat;
- /** Input split which we are going to write */
- private final InputSplit inputSplit;
- /** Input splits path */
- private final String inputSplitsPath;
- /** Index of the input split */
- private final int index;
- /** Whether to write locality information */
- private final boolean writeLocations;
-
- /**
- * Constructor
- *
- * @param inputFormat Input format
- * @param inputSplit Input split which we are going to write
- * @param inputSplitsPath Input splits path
- * @param index Index of the input split
- * @param writeLocations whether to write the input split's locations (to
- * be used by workers for prioritizing local splits
- * when reading)
- */
- public WriteInputSplit(GiraphInputFormat inputFormat,
- InputSplit inputSplit,
- String inputSplitsPath,
- int index,
- boolean writeLocations) {
- this.inputFormat = inputFormat;
- this.inputSplit = inputSplit;
- this.inputSplitsPath = inputSplitsPath;
- this.index = index;
- this.writeLocations = writeLocations;
- }
-
- @Override
- public Void call() {
- String inputSplitPath = null;
- try {
- ByteArrayOutputStream byteArrayOutputStream =
- new ByteArrayOutputStream();
- DataOutput outputStream =
- new DataOutputStream(byteArrayOutputStream);
-
- if (writeLocations) {
- String[] splitLocations = inputSplit.getLocations();
- StringBuilder locations = null;
- if (splitLocations != null) {
- int splitListLength =
- Math.min(splitLocations.length, localityLimit);
- locations = new StringBuilder();
- for (String location : splitLocations) {
- locations.append(location)
- .append(--splitListLength > 0 ? "\t" : "");
- }
- }
- Text.writeString(outputStream,
- locations == null ? "" : locations.toString());
- }
-
- inputFormat.writeInputSplit(inputSplit, outputStream);
- inputSplitPath = inputSplitsPath + "/" + index;
- getZkExt().createExt(inputSplitPath,
- byteArrayOutputStream.toByteArray(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("call: Created input split " +
- "with index " + index + " serialized as " +
- byteArrayOutputStream.toString(Charset.defaultCharset().name()));
- }
- } catch (KeeperException.NodeExistsException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("call: Node " +
- inputSplitPath + " already exists.");
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "call: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "call: IllegalStateException", e);
- } catch (IOException e) {
- throw new IllegalStateException(
- "call: IOException", e);
- }
- return null;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 5558cee..8ca3d3a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -42,7 +42,7 @@ import com.google.common.collect.Maps;
/** Handler for reduce/broadcast on the master */
public class MasterAggregatorHandler
- implements MasterGlobalCommUsage, Writable {
+ implements MasterGlobalCommUsageAggregators, Writable {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(MasterAggregatorHandler.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
new file mode 100644
index 0000000..717a24d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.master.input.MasterInputSplitsHandler;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Handler for all master communications
+ */
+public class MasterGlobalCommHandler implements MasterGlobalCommUsage {
+ /** Aggregator handler */
+ private final MasterAggregatorHandler aggregatorHandler;
+ /** Input splits handler*/
+ private final MasterInputSplitsHandler inputSplitsHandler;
+
+ /**
+ * Constructor
+ *
+ * @param aggregatorHandler Aggregator handler
+ * @param inputSplitsHandler Input splits handler
+ */
+ public MasterGlobalCommHandler(
+ MasterAggregatorHandler aggregatorHandler,
+ MasterInputSplitsHandler inputSplitsHandler) {
+ this.aggregatorHandler = aggregatorHandler;
+ this.inputSplitsHandler = inputSplitsHandler;
+ }
+
+ public MasterAggregatorHandler getAggregatorHandler() {
+ return aggregatorHandler;
+ }
+
+ public MasterInputSplitsHandler getInputSplitsHandler() {
+ return inputSplitsHandler;
+ }
+
+ @Override
+ public <S, R extends Writable> void registerReducer(String name,
+ ReduceOperation<S, R> reduceOp) {
+ aggregatorHandler.registerReducer(name, reduceOp);
+ }
+
+ @Override
+ public <S, R extends Writable> void registerReducer(String name,
+ ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+ aggregatorHandler.registerReducer(name, reduceOp, globalInitialValue);
+ }
+
+ @Override
+ public <R extends Writable> R getReduced(String name) {
+ return aggregatorHandler.getReduced(name);
+ }
+
+ @Override
+ public void broadcast(String name, Writable value) {
+ aggregatorHandler.broadcast(name, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
index 7ee9048..60b1809 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
@@ -17,52 +17,9 @@
*/
package org.apache.giraph.master;
-import org.apache.giraph.reducers.ReduceOperation;
-import org.apache.hadoop.io.Writable;
-
/**
- * Master compute can access reduce and broadcast methods
- * through this interface, from masterCompute method.
+ * All global master communication
*/
-public interface MasterGlobalCommUsage {
- /**
- * Register reducer to be reduced in the next worker computation,
- * using given name and operations.
- * @param name Name of the reducer
- * @param reduceOp Reduce operations
- * @param <S> Single value type
- * @param <R> Reduced value type
- */
- <S, R extends Writable> void registerReducer(
- String name, ReduceOperation<S, R> reduceOp);
-
- /**
- * Register reducer to be reduced in the next worker computation, using
- * given name and operations, starting globally from globalInitialValue.
- * (globalInitialValue is reduced only once, each worker will still start
- * from neutral initial value)
- *
- * @param name Name of the reducer
- * @param reduceOp Reduce operations
- * @param globalInitialValue Global initial value
- * @param <S> Single value type
- * @param <R> Reduced value type
- */
- <S, R extends Writable> void registerReducer(
- String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
-
- /**
- * Get reduced value from previous worker computation.
- * @param name Name of the reducer
- * @return Reduced value
- * @param <R> Reduced value type
- */
- <R extends Writable> R getReduced(String name);
-
- /**
- * Broadcast given value to all workers for next computation.
- * @param name Name of the broadcast object
- * @param value Value
- */
- void broadcast(String name, Writable value);
+public interface MasterGlobalCommUsage
+ extends MasterGlobalCommUsageAggregators {
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
new file mode 100644
index 0000000..62c1f3f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access reduce and broadcast methods
+ * through this interface, from masterCompute method.
+ */
+public interface MasterGlobalCommUsageAggregators {
+ /**
+ * Register reducer to be reduced in the next worker computation,
+ * using given name and operations.
+ * @param name Name of the reducer
+ * @param reduceOp Reduce operations
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+ <S, R extends Writable> void registerReducer(
+ String name, ReduceOperation<S, R> reduceOp);
+
+ /**
+ * Register reducer to be reduced in the next worker computation, using
+ * given name and operations, starting globally from globalInitialValue.
+ * (globalInitialValue is reduced only once, each worker will still start
+ * from neutral initial value)
+ *
+ * @param name Name of the reducer
+ * @param reduceOp Reduce operations
+ * @param globalInitialValue Global initial value
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+ <S, R extends Writable> void registerReducer(
+ String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
+
+ /**
+ * Get reduced value from previous worker computation.
+ * @param name Name of the reducer
+ * @return Reduced value
+ * @param <R> Reduced value type
+ */
+ <R extends Writable> R getReduced(String name);
+
+ /**
+ * Broadcast given value to all workers for next computation.
+ * @param name Name of the broadcast object
+ * @param value Value
+ */
+ void broadcast(String name, Writable value);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..5168e32
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Input splits organizer for vertex and edge input splits on master, which
+ * doesn't use locality information
+ */
+public class BasicInputSplitsMasterOrganizer
+ implements InputSplitsMasterOrganizer {
+ /** Available splits queue */
+ private final ConcurrentLinkedQueue<byte[]> splits;
+
+ /**
+ * Constructor
+ *
+ * @param serializedSplits Splits
+ */
+ public BasicInputSplitsMasterOrganizer(List<byte[]> serializedSplits) {
+ splits = new ConcurrentLinkedQueue<>(serializedSplits);
+ }
+
+ @Override
+ public byte[] getSerializedSplitFor(int workerTaskId) {
+ return splits.poll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..d5a0131
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+/**
+ * Interface for different input split organizers on master
+ */
+public interface InputSplitsMasterOrganizer {
+ /**
+ * @param workerTaskId Id of worker requesting split
+ *
+ * @return Get next split for the worker, or null if all splits were taken
+ * already
+ */
+ byte[] getSerializedSplitFor(int workerTaskId);
+}