You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2015/10/19 19:14:31 UTC
[2/3] git commit: updated refs/heads/trunk to 5b0cd0e
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..d3eb5da
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Input splits organizer for vertex and edge input splits on master, which
+ * uses locality information
+ */
+public class LocalityAwareInputSplitsMasterOrganizer
+ implements InputSplitsMasterOrganizer {
+ /** All splits before this pointer were taken */
+ private final AtomicInteger listPointer = new AtomicInteger();
+ /** List of serialized splits */
+ private final List<byte[]> serializedSplits;
+ /** Array containing information about whether a split was taken or not */
+ private final AtomicBoolean[] splitsTaken;
+
+ /** Map with preferred splits for each worker */
+ private final Map<Integer, ConcurrentLinkedQueue<Integer>>
+ workerToPreferredSplitsMap;
+
+
+ /**
+ * Constructor
+ *
+ * @param serializedSplits Serialized splits
+ * @param splits Splits
+ * @param workers List of workers
+ */
+ public LocalityAwareInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
+ List<InputSplit> splits, List<WorkerInfo> workers) {
+ this.serializedSplits = serializedSplits;
+ splitsTaken = new AtomicBoolean[serializedSplits.size()];
+ // Mark all splits as not taken initially
+ for (int i = 0; i < serializedSplits.size(); i++) {
+ splitsTaken[i] = new AtomicBoolean(false);
+ }
+
+ workerToPreferredSplitsMap = new HashMap<>();
+ for (WorkerInfo worker : workers) {
+ workerToPreferredSplitsMap.put(worker.getTaskId(),
+ new ConcurrentLinkedQueue<Integer>());
+ }
+ // Go through all splits
+ for (int i = 0; i < splits.size(); i++) {
+ try {
+ String[] locations = splits.get(i).getLocations();
+ // For every worker
+ for (WorkerInfo worker : workers) {
+ // Check splits locations
+ for (String location : locations) {
+ // If split is local for the worker, add it to preferred list
+ if (location.contains(worker.getHostname())) {
+ workerToPreferredSplitsMap.get(worker.getTaskId()).add(i);
+ break;
+ }
+ }
+ }
+ } catch (IOException | InterruptedException e) {
+ throw new IllegalStateException(
+ "Exception occurred while getting splits locations", e);
+ }
+ }
+ }
+
+ @Override
+ public byte[] getSerializedSplitFor(int workerTaskId) {
+ ConcurrentLinkedQueue<Integer> preferredSplits =
+ workerToPreferredSplitsMap.get(workerTaskId);
+ // Try to find a local split
+ while (true) {
+ // Get position to check
+ Integer splitIndex = preferredSplits.poll();
+ // Check if all local splits were already processed for this worker
+ if (splitIndex == null) {
+ break;
+ }
+ // Try to reserve the split
+ if (splitsTaken[splitIndex].compareAndSet(false, true)) {
+ return serializedSplits.get(splitIndex);
+ }
+ }
+
+ // No more local splits available, proceed linearly from splits list
+ while (true) {
+ // Get position to check
+ int splitIndex = listPointer.getAndIncrement();
+ // Check if all splits were already taken
+ if (splitIndex >= serializedSplits.size()) {
+ return null;
+ }
+ // Try to reserve the split
+ if (splitsTaken[splitIndex].compareAndSet(false, true)) {
+ return serializedSplits.get(splitIndex);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..8399c8a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.worker.WorkerInfo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Organizer for mapping splits on master. Mapping splits need all to be
+ * given to all workers, unlike vertex and edge splits which are read by
+ * exactly one worker each
+ */
+public class MappingInputSplitsMasterOrganizer
+ implements InputSplitsMasterOrganizer {
+ /** List of splits */
+ private final List<byte[]> splits;
+ /** Map from worker task id to atomic pointer in splits list */
+ private final Map<Integer, AtomicInteger>
+ workerTaskIdToNextSplitIndexMap;
+
+ /**
+ * Constructor
+ *
+ * @param serializedSplits Splits
+ * @param workers List of workers
+ */
+ public MappingInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
+ List<WorkerInfo> workers) {
+ this.splits = serializedSplits;
+ workerTaskIdToNextSplitIndexMap = new HashMap<>();
+ for (WorkerInfo worker : workers) {
+ workerTaskIdToNextSplitIndexMap.put(
+ worker.getTaskId(), new AtomicInteger(0));
+ }
+ }
+
+ @Override
+ public byte[] getSerializedSplitFor(int workerTaskId) {
+ AtomicInteger nextSplitIndex =
+ workerTaskIdToNextSplitIndexMap.get(workerTaskId);
+ int splitIndex = nextSplitIndex.getAndIncrement();
+ return splitIndex < splits.size() ? splits.get(splitIndex) : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
new file mode 100644
index 0000000..327e59d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.InputType;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Handler for input splits on master
+ *
+ * Since currently Giraph fails if worker fails while reading input, we
+ * didn't complicate this part with retries yet, later it could be added by
+ * keeping track of which worker got which split and then if worker dies put
+ * these splits back to queues.
+ */
+public class MasterInputSplitsHandler {
+ /** Whether to use locality information */
+ private final boolean useLocality;
+ /** Master client */
+ private MasterClient masterClient;
+ /** Master client */
+ private List<WorkerInfo> workers;
+ /** Map of splits organizers for each split type */
+ private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
+ new EnumMap<>(InputType.class);
+ /** Latches to say when one input splits type is ready to be accessed */
+ private Map<InputType, CountDownLatch> latchesMap =
+ new EnumMap<>(InputType.class);
+
+ /**
+ * Constructor
+ *
+ * @param useLocality Whether to use locality information or not
+ */
+ public MasterInputSplitsHandler(boolean useLocality) {
+ this.useLocality = useLocality;
+ for (InputType inputType : InputType.values()) {
+ latchesMap.put(inputType, new CountDownLatch(1));
+ }
+ }
+
+ /**
+ * Initialize
+ *
+ * @param masterClient Master client
+ * @param workers List of workers
+ */
+ public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
+ this.masterClient = masterClient;
+ this.workers = workers;
+ }
+
+ /**
+ * Add splits
+ *
+ * @param splitsType Type of splits
+ * @param inputSplits Splits
+ * @param inputFormat Format
+ */
+ public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
+ GiraphInputFormat inputFormat) {
+ List<byte[]> serializedSplits = new ArrayList<>();
+ for (InputSplit inputSplit : inputSplits) {
+ try {
+ ByteArrayOutputStream byteArrayOutputStream =
+ new ByteArrayOutputStream();
+ DataOutput outputStream =
+ new DataOutputStream(byteArrayOutputStream);
+ inputFormat.writeInputSplit(inputSplit, outputStream);
+ serializedSplits.add(byteArrayOutputStream.toByteArray());
+ } catch (IOException e) {
+ throw new IllegalStateException("IOException occurred", e);
+ }
+ }
+ InputSplitsMasterOrganizer inputSplitsOrganizer;
+ if (splitsType == InputType.MAPPING) {
+ inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer(
+ serializedSplits, workers);
+ } else {
+ inputSplitsOrganizer = useLocality ?
+ new LocalityAwareInputSplitsMasterOrganizer(serializedSplits,
+ inputSplits, workers) :
+ new BasicInputSplitsMasterOrganizer(serializedSplits);
+ }
+ splitsMap.put(splitsType, inputSplitsOrganizer);
+ latchesMap.get(splitsType).countDown();
+ }
+
+ /**
+ * Called after we receive a split request from some worker, should send
+ * split back to it if available, or send it information that there is no
+ * more available
+ *
+ * @param splitType Type of split requested
+ * @param workerTaskId Id of worker who requested split
+ */
+ public void sendSplitTo(InputType splitType, int workerTaskId) {
+ try {
+ // Make sure we don't try to retrieve splits before they were added
+ latchesMap.get(splitType).await();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Interrupted", e);
+ }
+ byte[] serializedInputSplit =
+ splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
+ masterClient.sendWritableRequest(workerTaskId,
+ new ReplyWithInputSplitRequest(splitType,
+ serializedInputSplit == null ? new byte[0] : serializedInputSplit));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
new file mode 100644
index 0000000..992b6fe
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Input related master classes
+ */
+package org.apache.giraph.master.input;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index 4600745..6914c3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -115,7 +115,7 @@ public class PartitionUtils {
workerStatsMap.put(
workerInfo,
new VertexEdgeCount(partitionStats.getVertexCount(),
- partitionStats.getEdgeCount()));
+ partitionStats.getEdgeCount(), 0));
} else {
workerStatsMap.put(
workerInfo,
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 5b754d6..1031bb3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -59,8 +59,6 @@ import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeOutputFormat;
@@ -177,10 +175,8 @@ public class BspServiceWorker<I extends WritableComparable,
/** Time spent waiting on requests to finish */
private GiraphTimer waitRequestsTimer;
- /** InputSplit handlers used in INPUT_SUPERSTEP for vertex splits */
- private InputSplitsHandler vertexSplitsHandler;
- /** InputSplit handlers used in INPUT_SUPERSTEP for edge splits */
- private InputSplitsHandler edgeSplitsHandler;
+ /** InputSplit handlers used in INPUT_SUPERSTEP */
+ private WorkerInputSplitsHandler inputSplitsHandler;
/**
* Constructor for setting up the worker.
@@ -237,8 +233,9 @@ public class BspServiceWorker<I extends WritableComparable,
null;
GiraphMetrics.get().addSuperstepResetObserver(this);
- vertexSplitsHandler = null;
- edgeSplitsHandler = null;
+
+ inputSplitsHandler = new WorkerInputSplitsHandler(
+ workerInfo, masterInfo.getTaskId(), workerClient);
}
@Override
@@ -295,26 +292,20 @@ public class BspServiceWorker<I extends WritableComparable,
*
* Use one or more threads to do the loading.
*
- * @param inputSplitPathList List of input split paths
* @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
* @return Statistics of the vertices and edges loaded
* @throws InterruptedException
* @throws KeeperException
*/
private VertexEdgeCount loadInputSplits(
- List<String> inputSplitPathList,
CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
throws KeeperException, InterruptedException {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
- // Determine how many threads to use based on the number of input splits
- int maxInputSplitThreads = (inputSplitPathList.size() - 1) /
- getConfiguration().getMaxWorkers() + 1;
- int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
- maxInputSplitThreads);
+ int numThreads = getConfiguration().getNumInputSplitsThreads();
if (LOG.isInfoEnabled()) {
LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
"originally " + getConfiguration().getNumInputSplitsThreads() +
- " threads(s) for " + inputSplitPathList.size() + " total splits.");
+ " threads(s)");
}
List<VertexEdgeCount> results =
@@ -336,46 +327,21 @@ public class BspServiceWorker<I extends WritableComparable,
*/
private long loadMapping() throws KeeperException,
InterruptedException {
- List<String> inputSplitPathList =
- getZkExt().getChildrenExt(mappingInputSplitsPaths.getPath(),
- false, false, true);
-
- InputSplitPathOrganizer splitOrganizer =
- new InputSplitPathOrganizer(getZkExt(),
- inputSplitPathList, getWorkerInfo().getHostname(),
- getConfiguration().useInputSplitLocality());
-
MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
- mappingInputSplitsCallableFactory =
+ inputSplitsCallableFactory =
new MappingInputSplitsCallableFactory<>(
getConfiguration().createWrappedMappingInputFormat(),
- splitOrganizer,
getContext(),
getConfiguration(),
this,
- getZkExt());
+ inputSplitsHandler);
- long entriesLoaded = 0;
- // Determine how many threads to use based on the number of input splits
- int maxInputSplitThreads = inputSplitPathList.size();
- int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
- maxInputSplitThreads);
- if (LOG.isInfoEnabled()) {
- LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
- "originally " + getConfiguration().getNumInputSplitsThreads() +
- " threads(s) for " + inputSplitPathList.size() + " total splits.");
- }
+ long mappingsLoaded =
+ loadInputSplits(inputSplitsCallableFactory).getMappingCount();
- List<Integer> results =
- ProgressableUtils.getResultsWithNCallables(
- mappingInputSplitsCallableFactory,
- numThreads, "load-mapping-%d", getContext());
- for (Integer result : results) {
- entriesLoaded += result;
- }
// after all threads finish loading - call postFilling
localData.getMappingStore().postFilling();
- return entriesLoaded;
+ return mappingsLoaded;
}
/**
@@ -386,31 +352,15 @@ public class BspServiceWorker<I extends WritableComparable,
*/
private VertexEdgeCount loadVertices() throws KeeperException,
InterruptedException {
- List<String> inputSplitPathList =
- getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
- false, false, true);
-
- InputSplitPathOrganizer splitOrganizer =
- new InputSplitPathOrganizer(getZkExt(),
- inputSplitPathList, getWorkerInfo().getHostname(),
- getConfiguration().useInputSplitLocality());
- vertexSplitsHandler = new InputSplitsHandler(
- splitOrganizer,
- getZkExt(),
- getContext(),
- BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
- BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
-
VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
new VertexInputSplitsCallableFactory<I, V, E>(
getConfiguration().createWrappedVertexInputFormat(),
getContext(),
getConfiguration(),
this,
- vertexSplitsHandler,
- getZkExt());
+ inputSplitsHandler);
- return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+ return loadInputSplits(inputSplitsCallableFactory);
}
/**
@@ -420,32 +370,15 @@ public class BspServiceWorker<I extends WritableComparable,
* @return Number of edges loaded
*/
private long loadEdges() throws KeeperException, InterruptedException {
- List<String> inputSplitPathList =
- getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
- false, false, true);
-
- InputSplitPathOrganizer splitOrganizer =
- new InputSplitPathOrganizer(getZkExt(),
- inputSplitPathList, getWorkerInfo().getHostname(),
- getConfiguration().useInputSplitLocality());
- edgeSplitsHandler = new InputSplitsHandler(
- splitOrganizer,
- getZkExt(),
- getContext(),
- BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
- BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
-
EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
new EdgeInputSplitsCallableFactory<I, V, E>(
getConfiguration().createWrappedEdgeInputFormat(),
getContext(),
getConfiguration(),
this,
- edgeSplitsHandler,
- getZkExt());
+ inputSplitsHandler);
- return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
- getEdgeCount();
+ return loadInputSplits(inputSplitsCallableFactory).getEdgeCount();
}
@Override
@@ -459,46 +392,12 @@ public class BspServiceWorker<I extends WritableComparable,
}
/**
- * Ensure the input splits are ready for processing
- *
- * @param inputSplitPaths Input split paths
- * @param inputSplitEvents Input split events
- */
- private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents) {
- while (true) {
- Stat inputSplitsReadyStat;
- try {
- inputSplitsReadyStat = getZkExt().exists(
- inputSplitPaths.getAllReadyPath(), true);
- } catch (KeeperException e) {
- throw new IllegalStateException("ensureInputSplitsReady: " +
- "KeeperException waiting on input splits", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("ensureInputSplitsReady: " +
- "InterruptedException waiting on input splits", e);
- }
- if (inputSplitsReadyStat != null) {
- break;
- }
- inputSplitEvents.getAllReadyChanged().waitForever();
- inputSplitEvents.getAllReadyChanged().reset();
- }
- }
-
- /**
* Mark current worker as done and then wait for all workers
* to finish processing input splits.
- *
- * @param inputSplitPaths Input split paths
- * @param inputSplitEvents Input split events
*/
- private void markCurrentWorkerDoneThenWaitForOthers(
- InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents) {
+ private void markCurrentWorkerDoneReadingThenWaitForOthers() {
String workerInputSplitsDonePath =
- inputSplitPaths.getDonePath() + "/" +
- getWorkerInfo().getHostnameId();
+ inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId();
try {
getZkExt().createExt(workerInputSplitsDonePath,
null,
@@ -508,32 +407,31 @@ public class BspServiceWorker<I extends WritableComparable,
} catch (KeeperException e) {
throw new IllegalStateException(
"markCurrentWorkerDoneThenWaitForOthers: " +
- "KeeperException creating worker done splits", e);
+ "KeeperException creating worker done splits", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"markCurrentWorkerDoneThenWaitForOthers: " +
- "InterruptedException creating worker done splits", e);
+ "InterruptedException creating worker done splits", e);
}
while (true) {
Stat inputSplitsDoneStat;
try {
inputSplitsDoneStat =
- getZkExt().exists(inputSplitPaths.getAllDonePath(),
- true);
+ getZkExt().exists(inputSplitsAllDonePath, true);
} catch (KeeperException e) {
throw new IllegalStateException(
"markCurrentWorkerDoneThenWaitForOthers: " +
- "KeeperException waiting on worker done splits", e);
+ "KeeperException waiting on worker done splits", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"markCurrentWorkerDoneThenWaitForOthers: " +
- "InterruptedException waiting on worker done splits", e);
+ "InterruptedException waiting on worker done splits", e);
}
if (inputSplitsDoneStat != null) {
break;
}
- inputSplitEvents.getAllDoneChanged().waitForever();
- inputSplitEvents.getAllDoneChanged().reset();
+ getInputSplitsAllDoneEvent().waitForever();
+ getInputSplitsAllDoneEvent().reset();
}
}
@@ -597,8 +495,6 @@ else[HADOOP_NON_SECURE]*/
long entriesLoaded;
if (getConfiguration().hasMappingInputFormat()) {
- // Ensure the mapping InputSplits are ready for processing
- ensureInputSplitsReady(mappingInputSplitsPaths, mappingInputSplitsEvents);
getContext().progress();
try {
entriesLoaded = loadMapping();
@@ -618,17 +514,12 @@ else[HADOOP_NON_SECURE]*/
entriesLoaded + " entries from inputSplits");
}
- // Workers wait for each other to finish, coordinated by master
- markCurrentWorkerDoneThenWaitForOthers(mappingInputSplitsPaths,
- mappingInputSplitsEvents);
// Print stats for data stored in localData once mapping is fully
// loaded on all the workers
localData.printStats();
}
if (getConfiguration().hasVertexInputFormat()) {
- // Ensure the vertex InputSplits are ready for processing
- ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
getContext().progress();
try {
vertexEdgeCount = loadVertices();
@@ -646,8 +537,6 @@ else[HADOOP_NON_SECURE]*/
WorkerProgress.get().finishLoadingVertices();
if (getConfiguration().hasEdgeInputFormat()) {
- // Ensure the edge InputSplits are ready for processing
- ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
getContext().progress();
try {
vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
@@ -666,17 +555,7 @@ else[HADOOP_NON_SECURE]*/
LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
}
- if (getConfiguration().hasVertexInputFormat()) {
- // Workers wait for each other to finish, coordinated by master
- markCurrentWorkerDoneThenWaitForOthers(vertexInputSplitsPaths,
- vertexInputSplitsEvents);
- }
-
- if (getConfiguration().hasEdgeInputFormat()) {
- // Workers wait for each other to finish, coordinated by master
- markCurrentWorkerDoneThenWaitForOthers(edgeInputSplitsPaths,
- edgeInputSplitsEvents);
- }
+ markCurrentWorkerDoneReadingThenWaitForOthers();
// Create remaining partitions owned by this worker.
for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
@@ -898,13 +777,6 @@ else[HADOOP_NON_SECURE]*/
if (getSuperstep() != INPUT_SUPERSTEP) {
postSuperstepCallbacks();
- } else {
- if (getConfiguration().hasVertexInputFormat()) {
- vertexSplitsHandler.setDoneReadingGraph(true);
- }
- if (getConfiguration().hasEdgeInputFormat()) {
- edgeSplitsHandler.setDoneReadingGraph(true);
- }
}
globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
@@ -1692,7 +1564,7 @@ else[HADOOP_NON_SECURE]*/
workerClient.setup(getConfiguration().authenticate());
/*end[HADOOP_NON_SECURE]*/
return new VertexEdgeCount(globalStats.getVertexCount(),
- globalStats.getEdgeCount());
+ globalStats.getEdgeCount(), 0);
} catch (IOException e) {
throw new RuntimeException(
@@ -1963,4 +1835,9 @@ else[HADOOP_NON_SECURE]*/
}
return count;
}
+
+ @Override
+ public WorkerInputSplitsHandler getInputSplitsHandler() {
+ return inputSplitsHandler;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 89f74b3..b7f1eb6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -26,9 +26,9 @@ import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.InputType;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -89,17 +89,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt Handle to ZooKeeperExt
*/
public EdgeInputSplitsCallable(
EdgeInputFormat<I, E> edgeInputFormat,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
- super(context, configuration, bspServiceWorker, splitsHandler,
- zooKeeperExt);
+ WorkerInputSplitsHandler splitsHandler) {
+ super(context, configuration, bspServiceWorker, splitsHandler);
this.edgeInputFormat = edgeInputFormat;
this.bspServiceWorker = bspServiceWorker;
@@ -126,6 +123,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
return edgeInputFormat;
}
+ @Override
+ public InputType getInputType() {
+ return InputType.EDGE;
+ }
+
/**
* Read edges from input split. If testing, the user may request a
* maximum number of edges to be read from an input split.
@@ -226,6 +228,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
WorkerProgress.get().incrementEdgeInputSplitsLoaded();
- return new VertexEdgeCount(0, inputSplitEdgesLoaded);
+ return new VertexEdgeCount(0, inputSplitEdgesLoaded, 0);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index f68ac93..d4bc1fc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -46,9 +45,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
/** {@link BspServiceWorker} we're running on. */
private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Handler for input splits */
- private final InputSplitsHandler splitsHandler;
- /** {@link ZooKeeperExt} for this worker. */
- private final ZooKeeperExt zooKeeperExt;
+ private final WorkerInputSplitsHandler splitsHandler;
/**
* Constructor.
@@ -58,20 +55,17 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker Calling {@link BspServiceWorker}
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt {@link ZooKeeperExt} for this worker
*/
public EdgeInputSplitsCallableFactory(
EdgeInputFormat<I, E> edgeInputFormat,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
+ WorkerInputSplitsHandler splitsHandler) {
this.edgeInputFormat = edgeInputFormat;
this.context = context;
this.configuration = configuration;
this.bspServiceWorker = bspServiceWorker;
- this.zooKeeperExt = zooKeeperExt;
this.splitsHandler = splitsHandler;
}
@@ -82,7 +76,6 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
context,
configuration,
bspServiceWorker,
- splitsHandler,
- zooKeeperExt);
+ splitsHandler);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
deleted file mode 100644
index 4e93ce0..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.io.GiraphInputFormat;
-import org.apache.giraph.time.SystemTime;
-import org.apache.giraph.time.Time;
-import org.apache.giraph.time.Times;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * InputSplitCallable to read all the splits
- *
- * @param <I> vertexId type
- * @param <V> vertexValue type
- * @param <E> edgeValue type
- */
-public abstract class FullInputSplitCallable<I extends WritableComparable,
- V extends Writable, E extends Writable>
- implements Callable<Integer> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(
- FullInputSplitCallable.class);
- /** Class time object */
- private static final Time TIME = SystemTime.get();
- /** Configuration */
- protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
- /** Context */
- protected final Mapper<?, ?, ?, ?>.Context context;
-
- /** The List of InputSplit znode paths */
- private final List<String> pathList;
- /** Current position in the path list */
- private final AtomicInteger currentIndex;
- /** ZooKeeperExt handle */
- private final ZooKeeperExt zooKeeperExt;
- /** Get the start time in nanos */
- private final long startNanos = TIME.getNanoseconds();
-
- // CHECKSTYLE: stop ParameterNumberCheck
- /**
- * Constructor.
-
- * @param splitOrganizer Input splits organizer
- * @param context Context
- * @param configuration Configuration
- * @param zooKeeperExt Handle to ZooKeeperExt
- * @param currentIndex Atomic Integer to get splitPath from list
- */
- public FullInputSplitCallable(InputSplitPathOrganizer splitOrganizer,
- Mapper<?, ?, ?, ?>.Context context,
- ImmutableClassesGiraphConfiguration<I, V, E> configuration,
- ZooKeeperExt zooKeeperExt,
- AtomicInteger currentIndex) {
- this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
- this.currentIndex = currentIndex;
- this.zooKeeperExt = zooKeeperExt;
- this.context = context;
- this.configuration = configuration;
- }
- // CHECKSTYLE: resume ParameterNumberCheck
-
- /**
- * Get input format
- *
- * @return Input format
- */
- public abstract GiraphInputFormat getInputFormat();
-
- /**
- * Load mapping entries from all the given input splits
- *
- * @param inputSplit Input split to load
- * @return Count of vertices and edges loaded
- * @throws java.io.IOException
- * @throws InterruptedException
- */
- protected abstract Integer readInputSplit(InputSplit inputSplit)
- throws IOException, InterruptedException;
-
- @Override
- public Integer call() {
- int entries = 0;
- String inputSplitPath;
- int inputSplitsProcessed = 0;
- try {
- while (true) {
- int pos = currentIndex.getAndIncrement();
- if (pos >= pathList.size()) {
- break;
- }
- inputSplitPath = pathList.get(pos);
- entries += loadInputSplit(inputSplitPath);
- context.progress();
- ++inputSplitsProcessed;
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException("call: InterruptedException", e);
- } catch (IOException e) {
- throw new IllegalStateException("call: IOException", e);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException("call: ClassNotFoundException", e);
- } catch (InstantiationException e) {
- throw new IllegalStateException("call: InstantiationException", e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("call: IllegalAccessException", e);
- }
-
- if (LOG.isInfoEnabled()) {
- float seconds = Times.getNanosSince(TIME, startNanos) /
- Time.NS_PER_SECOND_AS_FLOAT;
- float entriesPerSecond = entries / seconds;
- LOG.info("call: Loaded " + inputSplitsProcessed + " " +
- "input splits in " + seconds + " secs, " + entries +
- " " + entriesPerSecond + " entries/sec");
- }
- return entries;
- }
-
- /**
- * Extract entries from input split, saving them into mapping store.
- * Mark the input split finished when done.
- *
- * @param inputSplitPath ZK location of input split
- * @return Number of entries read in this input split
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- * @throws InstantiationException
- * @throws IllegalAccessException
- */
- private Integer loadInputSplit(
- String inputSplitPath)
- throws IOException, ClassNotFoundException, InterruptedException,
- InstantiationException, IllegalAccessException {
- InputSplit inputSplit = getInputSplit(inputSplitPath);
- Integer entriesRead = readInputSplit(inputSplit);
- if (LOG.isInfoEnabled()) {
- LOG.info("loadFromInputSplit: Finished loading " +
- inputSplitPath + " " + entriesRead);
- }
- return entriesRead;
- }
-
- /**
- * Talk to ZooKeeper to convert the input split path to the actual
- * InputSplit.
- *
- * @param inputSplitPath Location in ZK of input split
- * @return instance of InputSplit
- * @throws IOException
- * @throws ClassNotFoundException
- */
- protected InputSplit getInputSplit(String inputSplitPath)
- throws IOException, ClassNotFoundException {
- byte[] splitList;
- try {
- splitList = zooKeeperExt.getData(inputSplitPath, false, null);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "getInputSplit: KeeperException on " + inputSplitPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "getInputSplit: IllegalStateException on " + inputSplitPath, e);
- }
- context.progress();
-
- DataInputStream inputStream =
- new DataInputStream(new ByteArrayInputStream(splitList));
- InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("getInputSplit: Processing " + inputSplitPath +
- " from ZooKeeper and got input split '" +
- inputSplit.toString() + "'");
- }
- return inputSplit;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
deleted file mode 100644
index 463601c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.worker;
-
-import com.google.common.collect.Lists;
-
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Utility class to extract the list of InputSplits from the
- * ZooKeeper tree of "claimable splits" the master created,
- * and to sort the list to favor local data blocks.
- *
- * This class provides an Iterator for the list the worker will
- * claim splits from, making all sorting and data-code locality
- * processing done here invisible to callers. The aim is to cut
- * down on the number of ZK reads workers perform before locating
- * an unclaimed InputSplit.
- */
-public class InputSplitPathOrganizer {
- /** The worker's local ZooKeeperExt ref */
- private final ZooKeeperExt zooKeeper;
- /** The List of InputSplit znode paths */
- private final List<String> pathList;
- /** The worker's hostname */
- private final String hostName;
-
- /**
- * Constructor
- *
- * @param zooKeeper the worker's ZkExt
- * @param zkPathList the path to read from
- * @param hostName the worker's host name (for matching)
- * @param useLocality whether to prioritize local input splits
- */
- public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
- final String zkPathList, final String hostName,
- final boolean useLocality) throws KeeperException, InterruptedException {
- this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
- hostName, useLocality);
- }
-
- /**
- * Constructor
- *
- * @param zooKeeper the worker's ZkExt
- * @param inputSplitPathList path of input splits to read from
- * @param hostName the worker's host name (for matching)
- * @param useLocality whether to prioritize local input splits
- */
- public InputSplitPathOrganizer(
- final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
- final String hostName, final boolean useLocality) {
- this.zooKeeper = zooKeeper;
- this.pathList = Lists.newArrayList(inputSplitPathList);
- this.hostName = hostName;
- // Shuffle input splits in case several workers exist on this host
- Collections.shuffle(pathList);
- if (useLocality) {
- prioritizeLocalInputSplits();
- }
- }
-
- /**
- * Re-order list of InputSplits so files local to this worker node's
- * disk are the first it will iterate over when attempting to claim
- * a split to read. This will increase locality of data reads with greater
- * probability as the % of total nodes in the cluster hosting data and workers
- * BOTH increase towards 100%. Replication increases our chances of a "hit."
- */
- private void prioritizeLocalInputSplits() {
- List<String> sortedList = new ArrayList<String>();
- String hosts;
- for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
- final String path = iterator.next();
- try {
- hosts = getLocationsFromZkInputSplitData(path);
- } catch (IOException ioe) {
- hosts = null; // no problem, just don't sort this entry
- } catch (KeeperException ke) {
- hosts = null;
- } catch (InterruptedException ie) {
- hosts = null;
- }
- if (hosts != null && hosts.contains(hostName)) {
- sortedList.add(path); // collect the local block
- iterator.remove(); // remove local block from list
- }
- }
- pathList.addAll(0, sortedList);
- }
-
- /**
- * Utility for extracting locality data from an InputSplit ZNode.
- *
- * @param zkSplitPath the input split path to attempt to read
- * ZNode locality data from for this InputSplit.
- * @return a String of hostnames from ZNode data, or throws
- */
- private String getLocationsFromZkInputSplitData(String zkSplitPath)
- throws IOException, KeeperException, InterruptedException {
- byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
- DataInputStream inputStream =
- new DataInputStream(new ByteArrayInputStream(locationData));
- // only read the "first" entry in the znode data, the locations
- return Text.readString(inputStream);
- }
-
- /**
- * Get the ordered input splits paths.
- *
- * @return Ordered input splits paths
- */
- public Iterable<String> getPathList() {
- return pathList;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 7b2fc0f..92b23bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -28,6 +28,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.InputType;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MeterDesc;
@@ -35,14 +36,11 @@ import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
@@ -75,9 +73,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* Stores and processes the list of InputSplits advertised
* in a tree of child znodes by the master.
*/
- private final InputSplitsHandler splitsHandler;
- /** ZooKeeperExt handle */
- private final ZooKeeperExt zooKeeperExt;
+ private final WorkerInputSplitsHandler splitsHandler;
/** Get the start time in nanos */
private final long startNanos = TIME.getNanoseconds();
/** Whether to prioritize local input splits. */
@@ -91,15 +87,12 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt Handle to ZooKeeperExt
*/
public InputSplitsCallable(
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
- this.zooKeeperExt = zooKeeperExt;
+ WorkerInputSplitsHandler splitsHandler) {
this.context = context;
this.workerClientRequestProcessor =
new NettyWorkerClientRequestProcessor<I, V, E>(
@@ -119,6 +112,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
public abstract GiraphInputFormat getInputFormat();
/**
+ * Get input type
+ *
+ * @return Input type
+ */
+ public abstract InputType getInputType();
+
+ /**
* Get Meter tracking edges loaded
*
* @return Meter tracking edges loaded
@@ -205,27 +205,22 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
@Override
public VertexEdgeCount call() {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
- String inputSplitPath;
+ byte[] serializedInputSplit;
int inputSplitsProcessed = 0;
try {
- while ((inputSplitPath = splitsHandler.reserveInputSplit()) != null) {
- vertexEdgeCount =
- vertexEdgeCount.incrVertexEdgeCount(loadInputSplit(inputSplitPath));
+ while ((serializedInputSplit =
+ splitsHandler.reserveInputSplit(getInputType())) != null) {
+ vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+ loadInputSplit(serializedInputSplit));
context.progress();
++inputSplitsProcessed;
}
- } catch (KeeperException e) {
- throw new IllegalStateException("call: KeeperException", e);
} catch (InterruptedException e) {
throw new IllegalStateException("call: InterruptedException", e);
} catch (IOException e) {
throw new IllegalStateException("call: IOException", e);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("call: ClassNotFoundException", e);
- } catch (InstantiationException e) {
- throw new IllegalStateException("call: InstantiationException", e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("call: IllegalAccessException", e);
}
if (LOG.isInfoEnabled()) {
@@ -252,25 +247,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* reached in readVerticeFromInputSplit.
* Mark the input split finished when done.
*
- * @param inputSplitPath ZK location of input split
+ * @param serializedInputSplit Serialized input split
* @return Mapping of vertex indices and statistics, or null if no data read
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
- * @throws InstantiationException
- * @throws IllegalAccessException
*/
- private VertexEdgeCount loadInputSplit(
- String inputSplitPath)
- throws IOException, ClassNotFoundException, InterruptedException,
- InstantiationException, IllegalAccessException {
- InputSplit inputSplit = getInputSplit(inputSplitPath);
+ private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ InputSplit inputSplit = getInputSplit(serializedInputSplit);
VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
if (LOG.isInfoEnabled()) {
- LOG.info("loadFromInputSplit: Finished loading " +
- inputSplitPath + " " + vertexEdgeCount);
+ LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount);
}
- splitsHandler.markInputSplitPathFinished(inputSplitPath);
return vertexEdgeCount;
}
@@ -278,35 +267,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* Talk to ZooKeeper to convert the input split path to the actual
* InputSplit.
*
- * @param inputSplitPath Location in ZK of input split
+ * @param serializedInputSplit Serialized input split
* @return instance of InputSplit
* @throws IOException
* @throws ClassNotFoundException
*/
- protected InputSplit getInputSplit(String inputSplitPath)
- throws IOException, ClassNotFoundException {
- byte[] splitList;
- try {
- splitList = zooKeeperExt.getData(inputSplitPath, false, null);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "getInputSplit: KeeperException on " + inputSplitPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "getInputSplit: IllegalStateException on " + inputSplitPath, e);
- }
- context.progress();
-
+ protected InputSplit getInputSplit(byte[] serializedInputSplit)
+ throws IOException, ClassNotFoundException {
DataInputStream inputStream =
- new DataInputStream(new ByteArrayInputStream(splitList));
- if (useLocality) {
- Text.readString(inputStream); // location data unused here, skip
- }
+ new DataInputStream(new ByteArrayInputStream(serializedInputSplit));
InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
if (LOG.isInfoEnabled()) {
- LOG.info("getInputSplit: Reserved " + inputSplitPath +
- " from ZooKeeper and got input split '" +
+ LOG.info("getInputSplit: Reserved input split '" +
inputSplit.toString() + "'");
}
return inputSplit;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
deleted file mode 100644
index e2099eb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Stores the list of input split paths, and provides thread-safe way for
- * reserving input splits.
- */
-public class InputSplitsHandler implements Watcher {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(InputSplitsHandler.class);
-
- /** The List of InputSplit znode paths */
- private final List<String> pathList;
- /** Current position in the path list */
- private final AtomicInteger currentIndex;
- /** The worker's local ZooKeeperExt ref */
- private final ZooKeeperExt zooKeeper;
- /** Context for reporting progress */
- private final Mapper<?, ?, ?, ?>.Context context;
- /** ZooKeeper input split reserved node. */
- private final String inputSplitReservedNode;
- /** ZooKeeper input split finished node. */
- private final String inputSplitFinishedNode;
- /** Specifies if we finished execution of INPUT_SUPERSTEP. The variable may
- * be accessed via different threads. */
- private volatile boolean doneReadingGraph;
-
- /**
- * Constructor
- *
- * @param splitOrganizer Input splits organizer
- * @param zooKeeper The worker's local ZooKeeperExt ref
- * @param context Context for reporting progress
- * @param inputSplitReservedNode ZooKeeper input split reserved node
- * @param inputSplitFinishedNode ZooKeeper input split finished node
- */
- public InputSplitsHandler(InputSplitPathOrganizer splitOrganizer,
- ZooKeeperExt zooKeeper, Mapper<?, ?, ?, ?>.Context context,
- String inputSplitReservedNode, String inputSplitFinishedNode) {
- this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
- this.currentIndex = new AtomicInteger(0);
- this.zooKeeper = zooKeeper;
- this.context = context;
- this.inputSplitReservedNode = inputSplitReservedNode;
- this.inputSplitFinishedNode = inputSplitFinishedNode;
- this.doneReadingGraph = false;
- }
-
- public void setDoneReadingGraph(boolean doneReadingGraph) {
- this.doneReadingGraph = doneReadingGraph;
- }
-
- /**
- * Try to reserve an InputSplit for loading. While InputSplits exists that
- * are not finished, wait until they are.
- *
- * NOTE: iterations on the InputSplit list only halt for each worker when it
- * has scanned the entire list once and found every split marked RESERVED.
- * When a worker fails, its Ephemeral RESERVED znodes will disappear,
- * allowing other iterating workers to claim it's previously read splits.
- * Only when the last worker left iterating on the list fails can a danger
- * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
- * causes job failure, this is OK. As the failure model evolves, this
- * behavior might need to change. We could add watches on
- * inputSplitFinishedNodes and stop iterating only when all these nodes
- * have been created.
- *
- * @return reserved InputSplit or null if no unfinished InputSplits exist
- * @throws KeeperException
- * @throws InterruptedException
- */
- public String reserveInputSplit() throws KeeperException,
- InterruptedException {
- String reservedInputSplitPath;
- Stat reservedStat;
- while (true) {
- int splitToTry = currentIndex.getAndIncrement();
- if (splitToTry >= pathList.size()) {
- return null;
- }
- String nextSplitToClaim = pathList.get(splitToTry);
- context.progress();
- String tmpInputSplitReservedPath =
- nextSplitToClaim + inputSplitReservedNode;
- reservedStat =
- zooKeeper.exists(tmpInputSplitReservedPath, this);
- if (reservedStat == null) {
- try {
- // Attempt to reserve this InputSplit
- zooKeeper.createExt(tmpInputSplitReservedPath,
- null,
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL,
- false);
- reservedInputSplitPath = nextSplitToClaim;
- if (LOG.isInfoEnabled()) {
- float percentFinished =
- splitToTry * 100.0f / pathList.size();
- LOG.info("reserveInputSplit: Reserved input " +
- "split path " + reservedInputSplitPath +
- ", overall roughly " +
- +percentFinished +
- "% input splits reserved");
- }
- return reservedInputSplitPath;
- } catch (KeeperException.NodeExistsException e) {
- LOG.info("reserveInputSplit: Couldn't reserve " +
- "(already reserved) inputSplit" +
- " at " + tmpInputSplitReservedPath);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "reserveInputSplit: KeeperException on reserve", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "reserveInputSplit: InterruptedException " +
- "on reserve", e);
- }
- }
- }
- }
-
- /**
- * Mark an input split path as completed by this worker. This notifies
- * the master and the other workers that this input split has not only
- * been reserved, but also marked processed.
- *
- * @param inputSplitPath Path to the input split.
- */
- public void markInputSplitPathFinished(String inputSplitPath) {
- String inputSplitFinishedPath =
- inputSplitPath + inputSplitFinishedNode;
- try {
- zooKeeper.createExt(inputSplitFinishedPath,
- null,
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException.NodeExistsException e) {
- LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
- " already exists!");
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "markInputSplitPathFinished: KeeperException on " +
- inputSplitFinishedPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "markInputSplitPathFinished: InterruptedException on " +
- inputSplitFinishedPath, e);
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
- if (event.getPath() == null) {
- LOG.warn("process: Problem with zookeeper, got event with path null, " +
- "state " + event.getState() + ", event type " + event.getType());
- return;
- }
- // Check if the reservation for the input split was lost in INPUT_SUPERSTEP
- // (some worker died). If INPUT_SUPERSTEP has already completed, we ignore
- // this event.
- if (event.getPath().endsWith(inputSplitReservedNode) &&
- event.getType() == Watcher.Event.EventType.NodeDeleted &&
- !doneReadingGraph) {
- synchronized (pathList) {
- String split = event.getPath();
- split = split.substring(0, split.indexOf(inputSplitReservedNode));
- pathList.add(split);
- if (LOG.isInfoEnabled()) {
- LOG.info("process: Input split " + split + " lost reservation");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
index f6dca25..5ab3ba9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -19,15 +19,15 @@
package org.apache.giraph.worker;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.MappingReader;
import org.apache.giraph.mapping.MappingEntry;
import org.apache.giraph.mapping.MappingStore;
-import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.io.InputType;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -46,7 +46,7 @@ import org.apache.hadoop.mapreduce.Mapper;
@SuppressWarnings("unchecked")
public class MappingInputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable, B extends Writable>
- extends FullInputSplitCallable<I, V, E> {
+ extends InputSplitsCallable<I, V, E> {
/** User supplied mappingInputFormat */
private final MappingInputFormat<I, V, E, B> mappingInputFormat;
/** Link to bspServiceWorker */
@@ -56,23 +56,18 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
* Constructor
*
* @param mappingInputFormat mappingInputFormat
- * @param splitOrganizer Input splits organizer
* @param context Context
* @param configuration Configuration
- * @param zooKeeperExt Handle to ZooKeeperExt
- * @param currentIndex Atomic Integer to get splitPath from list
* @param bspServiceWorker bsp service worker
+ * @param splitsHandler Splits handler
*/
public MappingInputSplitsCallable(
MappingInputFormat<I, V, E, B> mappingInputFormat,
- InputSplitPathOrganizer splitOrganizer,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
- ZooKeeperExt zooKeeperExt,
- AtomicInteger currentIndex,
- BspServiceWorker<I, V, E> bspServiceWorker) {
- super(splitOrganizer, context,
- configuration, zooKeeperExt, currentIndex);
+ BspServiceWorker<I, V, E> bspServiceWorker,
+ WorkerInputSplitsHandler splitsHandler) {
+ super(context, configuration, bspServiceWorker, splitsHandler);
this.mappingInputFormat = mappingInputFormat;
this.bspServiceWorker = bspServiceWorker;
}
@@ -83,7 +78,12 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
}
@Override
- protected Integer readInputSplit(InputSplit inputSplit)
+ public InputType getInputType() {
+ return InputType.MAPPING;
+ }
+
+ @Override
+ protected VertexEdgeCount readInputSplit(InputSplit inputSplit)
throws IOException, InterruptedException {
MappingReader<I, V, E, B> mappingReader =
mappingInputFormat.createMappingReader(inputSplit, context);
@@ -104,6 +104,6 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
entriesLoaded += 1;
mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
}
- return entriesLoaded;
+ return new VertexEdgeCount(0, 0, entriesLoaded);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
index 21a981e..6cf702a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
@@ -19,15 +19,13 @@
package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* Factory for {@link org.apache.giraph.worker.MappingInputSplitsCallable}s.
*
@@ -38,59 +36,47 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class MappingInputSplitsCallableFactory<I extends WritableComparable,
V extends Writable, E extends Writable, B extends Writable>
- implements CallableFactory<Integer> {
+ implements CallableFactory<VertexEdgeCount> {
/** Mapping input format */
private final MappingInputFormat<I, V, E, B> mappingInputFormat;
- /** Input split organizer */
- private final InputSplitPathOrganizer splitOrganizer;
/** Mapper context. */
private final Mapper<?, ?, ?, ?>.Context context;
/** Configuration. */
private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** {@link BspServiceWorker} we're running on. */
private final BspServiceWorker<I, V, E> bspServiceWorker;
- /** {@link ZooKeeperExt} for this worker. */
- private final ZooKeeperExt zooKeeperExt;
- /** Current position in the path list */
- private final AtomicInteger currentIndex;
-
+ /** Handler for input splits */
+ private final WorkerInputSplitsHandler splitsHandler;
/**
* Constructor.
*
* @param mappingInputFormat Mapping input format
- * @param splitOrganizer Input split organizer
* @param context Mapper context
* @param configuration Configuration
* @param bspServiceWorker Calling {@link BspServiceWorker}
- * @param zooKeeperExt {@link org.apache.giraph.zk.ZooKeeperExt}
- * for this worker
+ * @param splitsHandler Splits handler
*/
public MappingInputSplitsCallableFactory(
MappingInputFormat<I, V, E, B> mappingInputFormat,
- InputSplitPathOrganizer splitOrganizer,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- ZooKeeperExt zooKeeperExt) {
+ WorkerInputSplitsHandler splitsHandler) {
this.mappingInputFormat = mappingInputFormat;
- this.splitOrganizer = splitOrganizer;
this.context = context;
this.configuration = configuration;
this.bspServiceWorker = bspServiceWorker;
- this.zooKeeperExt = zooKeeperExt;
- this.currentIndex = new AtomicInteger(0);
+ this.splitsHandler = splitsHandler;
}
@Override
- public FullInputSplitCallable<I, V, E> newCallable(int threadId) {
+ public InputSplitsCallable<I, V, E> newCallable(int threadId) {
return new MappingInputSplitsCallable<>(
mappingInputFormat,
- splitOrganizer,
context,
configuration,
- zooKeeperExt,
- currentIndex,
- bspServiceWorker);
+ bspServiceWorker,
+ splitsHandler);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 00a2781..540a6b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -30,10 +30,10 @@ import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.mapping.translate.TranslateEdge;
+import org.apache.giraph.io.InputType;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -99,17 +99,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt Handle to ZooKeeperExt
*/
public VertexInputSplitsCallable(
VertexInputFormat<I, V, E> vertexInputFormat,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
- super(context, configuration, bspServiceWorker, splitsHandler,
- zooKeeperExt);
+ WorkerInputSplitsHandler splitsHandler) {
+ super(context, configuration, bspServiceWorker, splitsHandler);
this.vertexInputFormat = vertexInputFormat;
inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
@@ -136,6 +133,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
return vertexInputFormat;
}
+ @Override
+ public InputType getInputType() {
+ return InputType.VERTEX;
+ }
+
/**
* Read vertices from input split. If testing, the user may request a
* maximum number of vertices to be read from an input split.
@@ -274,7 +276,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
WorkerProgress.get().incrementVertexInputSplitsLoaded();
return new VertexEdgeCount(inputSplitVerticesLoaded,
- inputSplitEdgesLoaded + edgesSinceLastUpdate);
+ inputSplitEdgesLoaded + edgesSinceLastUpdate, 0);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index c9893d2..7aef3a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -46,9 +45,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
/** {@link BspServiceWorker} we're running on. */
private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Handler for input splits */
- private final InputSplitsHandler splitsHandler;
- /** {@link ZooKeeperExt} for this worker. */
- private final ZooKeeperExt zooKeeperExt;
+ private final WorkerInputSplitsHandler splitsHandler;
/**
* Constructor.
@@ -58,20 +55,17 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker Calling {@link BspServiceWorker}
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt {@link ZooKeeperExt} for this worker
*/
public VertexInputSplitsCallableFactory(
VertexInputFormat<I, V, E> vertexInputFormat,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
+ WorkerInputSplitsHandler splitsHandler) {
this.vertexInputFormat = vertexInputFormat;
this.context = context;
this.configuration = configuration;
this.bspServiceWorker = bspServiceWorker;
- this.zooKeeperExt = zooKeeperExt;
this.splitsHandler = splitsHandler;
}
@@ -82,7 +76,6 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
context,
configuration,
bspServiceWorker,
- splitsHandler,
- zooKeeperExt);
+ splitsHandler);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
new file mode 100644
index 0000000..0dc42b3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.requests.AskForInputSplitRequest;
+import org.apache.giraph.io.InputType;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Requests splits from master and keeps track of them
+ */
+public class WorkerInputSplitsHandler {
+ /** Worker info of this worker */
+ private final WorkerInfo workerInfo;
+ /** Task id of master */
+ private final int masterTaskId;
+ /** Worker client, used for communication */
+ private final WorkerClient workerClient;
+ /** Map with currently available splits received from master */
+ private final Map<InputType, BlockingQueue<byte[]>> availableInputSplits;
+
+ /**
+ * Constructor
+ *
+ * @param workerInfo Worker info of this worker
+ * @param masterTaskId Task id of master
+ * @param workerClient Worker client, used for communication
+ */
+ public WorkerInputSplitsHandler(WorkerInfo workerInfo, int masterTaskId,
+ WorkerClient workerClient) {
+ this.workerInfo = workerInfo;
+ this.masterTaskId = masterTaskId;
+ this.workerClient = workerClient;
+ availableInputSplits = new EnumMap<>(InputType.class);
+ for (InputType inputType : InputType.values()) {
+ availableInputSplits.put(
+ inputType, new LinkedBlockingQueue<byte[]>());
+ }
+ }
+
+ /**
+ * Called when an input split has been received from master, adding it to
+ * the map
+ *
+ * @param splitType Type of split
+ * @param serializedInputSplit Split
+ */
+ public void receivedInputSplit(InputType splitType,
+ byte[] serializedInputSplit) {
+ try {
+ availableInputSplits.get(splitType).put(serializedInputSplit);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Interrupted", e);
+ }
+ }
+
+ /**
+ * Try to reserve an InputSplit for loading. While InputSplits exists that
+ * are not finished, wait until they are.
+ *
+ * NOTE: iterations on the InputSplit list only halt for each worker when it
+ * has scanned the entire list once and found every split marked RESERVED.
+ * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+ * allowing other iterating workers to claim it's previously read splits.
+ * Only when the last worker left iterating on the list fails can a danger
+ * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+ * causes job failure, this is OK. As the failure model evolves, this
+ * behavior might need to change. We could add watches on
+ * inputSplitFinishedNodes and stop iterating only when all these nodes
+ * have been created.
+ *
+ * @param splitType Type of split
+ * @return reserved InputSplit or null if no unfinished InputSplits exist
+ */
+ public byte[] reserveInputSplit(InputType splitType) {
+ // Send request
+ workerClient.sendWritableRequest(masterTaskId,
+ new AskForInputSplitRequest(splitType, workerInfo.getTaskId()));
+ try {
+ // Wait for some split to become available
+ byte[] serializedInputSplit = availableInputSplits.get(splitType).take();
+ return serializedInputSplit.length == 0 ? null : serializedInputSplit;
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Interrupted", e);
+ }
+ }
+}