You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2015/10/19 19:14:30 UTC
[1/3] git commit: updated refs/heads/trunk to 5b0cd0e
Repository: giraph
Updated Branches:
refs/heads/trunk 47da75182 -> 5b0cd0e0a
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index b9fc508..8d8e86d 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -32,7 +32,6 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -57,9 +56,9 @@ import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.job.HadoopUtils;
+import org.apache.giraph.master.input.LocalityAwareInputSplitsMasterOrganizer;
import org.apache.giraph.utils.NoOpComputation;
-import org.apache.giraph.worker.InputSplitPathOrganizer;
-import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -69,7 +68,6 @@ import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.zookeeper.KeeperException;
@@ -316,38 +314,39 @@ public class
* @throws InterruptedException
*/
@Test
- public void testInputSplitPathOrganizer()
+ public void testInputSplitLocality()
throws IOException, KeeperException, InterruptedException {
- final List<String> testList = new ArrayList<String>();
- Collections.addAll(testList, "remote2", "local", "remote1");
- final String localHost = "node.LOCAL.com";
- final String testListName = "test_list_parent_znode";
- // build output just as we do to store hostlists in ZNODES
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- String last = "node.test4.com\tnode.test5.com\tnode.test6.com";
- Text.writeString(dos, last);
- byte[] remote1 = baos.toByteArray();
- baos = new ByteArrayOutputStream();
- dos = new DataOutputStream(baos);
- String middle = "node.test1.com\tnode.test2.com\tnode.test3.com";
- Text.writeString(dos, middle);
- byte[] remote2 = baos.toByteArray();
- baos = new ByteArrayOutputStream();
- dos = new DataOutputStream(baos);
- String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com";
- Text.writeString(dos, first);
- byte[] local = baos.toByteArray();
- ZooKeeperExt zk = mock(ZooKeeperExt.class);
- when(zk.getChildrenExt(testListName, false, false, true)).
- thenReturn(testList);
- when(zk.getData("remote1", false, null)).thenReturn(remote1);
- when(zk.getData("remote2", false, null)).thenReturn(remote2);
- when(zk.getData("local", false, null)).thenReturn(local);
- InputSplitPathOrganizer lis =
- new InputSplitPathOrganizer(zk, testListName, localHost, true);
- final List<String> resultList = Lists.newArrayList(lis.getPathList());
- assertEquals("local", resultList.get(0));
+ List<byte[]> serializedSplits = new ArrayList<>();
+ serializedSplits.add(new byte[]{1});
+ serializedSplits.add(new byte[]{2});
+ serializedSplits.add(new byte[]{3});
+
+ WorkerInfo workerInfo = mock(WorkerInfo.class);
+ when(workerInfo.getTaskId()).thenReturn(5);
+ when(workerInfo.getHostname()).thenReturn("node.LOCAL.com");
+
+ List<InputSplit> splits = new ArrayList<>();
+ InputSplit split1 = mock(InputSplit.class);
+ when(split1.getLocations()).thenReturn(new String[]{
+ "node.test1.com", "node.test2.com", "node.test3.com"});
+ splits.add(split1);
+ InputSplit split2 = mock(InputSplit.class);
+ when(split2.getLocations()).thenReturn(new String[]{
+ "node.testx.com", "node.LOCAL.com", "node.testy.com"});
+ splits.add(split2);
+ InputSplit split3 = mock(InputSplit.class);
+ when(split3.getLocations()).thenReturn(new String[]{
+ "node.test4.com", "node.test5.com", "node.test6.com"});
+ splits.add(split3);
+
+ LocalityAwareInputSplitsMasterOrganizer inputSplitOrganizer =
+ new LocalityAwareInputSplitsMasterOrganizer(
+ serializedSplits,
+ splits,
+ Lists.newArrayList(workerInfo));
+
+ assertEquals(2,
+ inputSplitOrganizer.getSerializedSplitFor(workerInfo.getTaskId())[0]);
}
/**
[2/3] git commit: updated refs/heads/trunk to 5b0cd0e
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..d3eb5da
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Input splits organizer for vertex and edge input splits on master, which
+ * uses locality information
+ */
+public class LocalityAwareInputSplitsMasterOrganizer
+ implements InputSplitsMasterOrganizer {
+ /** All splits before this pointer were taken */
+ private final AtomicInteger listPointer = new AtomicInteger();
+ /** List of serialized splits */
+ private final List<byte[]> serializedSplits;
+ /** Array containing information about whether a split was taken or not */
+ private final AtomicBoolean[] splitsTaken;
+
+ /** Map with preferred splits for each worker */
+ private final Map<Integer, ConcurrentLinkedQueue<Integer>>
+ workerToPreferredSplitsMap;
+
+
+ /**
+ * Constructor
+ *
+ * @param serializedSplits Serialized splits
+ * @param splits Splits
+ * @param workers List of workers
+ */
+ public LocalityAwareInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
+ List<InputSplit> splits, List<WorkerInfo> workers) {
+ this.serializedSplits = serializedSplits;
+ splitsTaken = new AtomicBoolean[serializedSplits.size()];
+ // Mark all splits as not taken initially
+ for (int i = 0; i < serializedSplits.size(); i++) {
+ splitsTaken[i] = new AtomicBoolean(false);
+ }
+
+ workerToPreferredSplitsMap = new HashMap<>();
+ for (WorkerInfo worker : workers) {
+ workerToPreferredSplitsMap.put(worker.getTaskId(),
+ new ConcurrentLinkedQueue<Integer>());
+ }
+ // Go through all splits
+ for (int i = 0; i < splits.size(); i++) {
+ try {
+ String[] locations = splits.get(i).getLocations();
+ // For every worker
+ for (WorkerInfo worker : workers) {
+ // Check splits locations
+ for (String location : locations) {
+ // If split is local for the worker, add it to preferred list
+ if (location.contains(worker.getHostname())) {
+ workerToPreferredSplitsMap.get(worker.getTaskId()).add(i);
+ break;
+ }
+ }
+ }
+ } catch (IOException | InterruptedException e) {
+ throw new IllegalStateException(
+ "Exception occurred while getting splits locations", e);
+ }
+ }
+ }
+
+ @Override
+ public byte[] getSerializedSplitFor(int workerTaskId) {
+ ConcurrentLinkedQueue<Integer> preferredSplits =
+ workerToPreferredSplitsMap.get(workerTaskId);
+ // Try to find a local split
+ while (true) {
+ // Get position to check
+ Integer splitIndex = preferredSplits.poll();
+ // Check if all local splits were already processed for this worker
+ if (splitIndex == null) {
+ break;
+ }
+ // Try to reserve the split
+ if (splitsTaken[splitIndex].compareAndSet(false, true)) {
+ return serializedSplits.get(splitIndex);
+ }
+ }
+
+ // No more local splits available, proceed linearly from splits list
+ while (true) {
+ // Get position to check
+ int splitIndex = listPointer.getAndIncrement();
+ // Check if all splits were already taken
+ if (splitIndex >= serializedSplits.size()) {
+ return null;
+ }
+ // Try to reserve the split
+ if (splitsTaken[splitIndex].compareAndSet(false, true)) {
+ return serializedSplits.get(splitIndex);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..8399c8a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.worker.WorkerInfo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Organizer for mapping splits on master. Mapping splits need all to be
+ * given to all workers, unlike vertex and edge splits which are read by
+ * exactly one worker each
+ */
+public class MappingInputSplitsMasterOrganizer
+ implements InputSplitsMasterOrganizer {
+ /** List of splits */
+ private final List<byte[]> splits;
+ /** Map from worker task id to atomic pointer in splits list */
+ private final Map<Integer, AtomicInteger>
+ workerTaskIdToNextSplitIndexMap;
+
+ /**
+ * Constructor
+ *
+ * @param serializedSplits Splits
+ * @param workers List of workers
+ */
+ public MappingInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
+ List<WorkerInfo> workers) {
+ this.splits = serializedSplits;
+ workerTaskIdToNextSplitIndexMap = new HashMap<>();
+ for (WorkerInfo worker : workers) {
+ workerTaskIdToNextSplitIndexMap.put(
+ worker.getTaskId(), new AtomicInteger(0));
+ }
+ }
+
+ @Override
+ public byte[] getSerializedSplitFor(int workerTaskId) {
+ AtomicInteger nextSplitIndex =
+ workerTaskIdToNextSplitIndexMap.get(workerTaskId);
+ int splitIndex = nextSplitIndex.getAndIncrement();
+ return splitIndex < splits.size() ? splits.get(splitIndex) : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
new file mode 100644
index 0000000..327e59d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.InputType;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Handler for input splits on master
+ *
+ * Since currently Giraph fails if worker fails while reading input, we
+ * didn't complicate this part with retries yet, later it could be added by
+ * keeping track of which worker got which split and then if worker dies put
+ * these splits back to queues.
+ */
+public class MasterInputSplitsHandler {
+ /** Whether to use locality information */
+ private final boolean useLocality;
+ /** Master client */
+ private MasterClient masterClient;
+ /** Master client */
+ private List<WorkerInfo> workers;
+ /** Map of splits organizers for each split type */
+ private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
+ new EnumMap<>(InputType.class);
+ /** Latches to say when one input splits type is ready to be accessed */
+ private Map<InputType, CountDownLatch> latchesMap =
+ new EnumMap<>(InputType.class);
+
+ /**
+ * Constructor
+ *
+ * @param useLocality Whether to use locality information or not
+ */
+ public MasterInputSplitsHandler(boolean useLocality) {
+ this.useLocality = useLocality;
+ for (InputType inputType : InputType.values()) {
+ latchesMap.put(inputType, new CountDownLatch(1));
+ }
+ }
+
+ /**
+ * Initialize
+ *
+ * @param masterClient Master client
+ * @param workers List of workers
+ */
+ public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
+ this.masterClient = masterClient;
+ this.workers = workers;
+ }
+
+ /**
+ * Add splits
+ *
+ * @param splitsType Type of splits
+ * @param inputSplits Splits
+ * @param inputFormat Format
+ */
+ public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
+ GiraphInputFormat inputFormat) {
+ List<byte[]> serializedSplits = new ArrayList<>();
+ for (InputSplit inputSplit : inputSplits) {
+ try {
+ ByteArrayOutputStream byteArrayOutputStream =
+ new ByteArrayOutputStream();
+ DataOutput outputStream =
+ new DataOutputStream(byteArrayOutputStream);
+ inputFormat.writeInputSplit(inputSplit, outputStream);
+ serializedSplits.add(byteArrayOutputStream.toByteArray());
+ } catch (IOException e) {
+ throw new IllegalStateException("IOException occurred", e);
+ }
+ }
+ InputSplitsMasterOrganizer inputSplitsOrganizer;
+ if (splitsType == InputType.MAPPING) {
+ inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer(
+ serializedSplits, workers);
+ } else {
+ inputSplitsOrganizer = useLocality ?
+ new LocalityAwareInputSplitsMasterOrganizer(serializedSplits,
+ inputSplits, workers) :
+ new BasicInputSplitsMasterOrganizer(serializedSplits);
+ }
+ splitsMap.put(splitsType, inputSplitsOrganizer);
+ latchesMap.get(splitsType).countDown();
+ }
+
+ /**
+ * Called after we receive a split request from some worker, should send
+ * split back to it if available, or send it information that there is no
+ * more available
+ *
+ * @param splitType Type of split requested
+ * @param workerTaskId Id of worker who requested split
+ */
+ public void sendSplitTo(InputType splitType, int workerTaskId) {
+ try {
+ // Make sure we don't try to retrieve splits before they were added
+ latchesMap.get(splitType).await();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Interrupted", e);
+ }
+ byte[] serializedInputSplit =
+ splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
+ masterClient.sendWritableRequest(workerTaskId,
+ new ReplyWithInputSplitRequest(splitType,
+ serializedInputSplit == null ? new byte[0] : serializedInputSplit));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
new file mode 100644
index 0000000..992b6fe
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Input related master classes
+ */
+package org.apache.giraph.master.input;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index 4600745..6914c3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -115,7 +115,7 @@ public class PartitionUtils {
workerStatsMap.put(
workerInfo,
new VertexEdgeCount(partitionStats.getVertexCount(),
- partitionStats.getEdgeCount()));
+ partitionStats.getEdgeCount(), 0));
} else {
workerStatsMap.put(
workerInfo,
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 5b754d6..1031bb3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -59,8 +59,6 @@ import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeOutputFormat;
@@ -177,10 +175,8 @@ public class BspServiceWorker<I extends WritableComparable,
/** Time spent waiting on requests to finish */
private GiraphTimer waitRequestsTimer;
- /** InputSplit handlers used in INPUT_SUPERSTEP for vertex splits */
- private InputSplitsHandler vertexSplitsHandler;
- /** InputSplit handlers used in INPUT_SUPERSTEP for edge splits */
- private InputSplitsHandler edgeSplitsHandler;
+ /** InputSplit handlers used in INPUT_SUPERSTEP */
+ private WorkerInputSplitsHandler inputSplitsHandler;
/**
* Constructor for setting up the worker.
@@ -237,8 +233,9 @@ public class BspServiceWorker<I extends WritableComparable,
null;
GiraphMetrics.get().addSuperstepResetObserver(this);
- vertexSplitsHandler = null;
- edgeSplitsHandler = null;
+
+ inputSplitsHandler = new WorkerInputSplitsHandler(
+ workerInfo, masterInfo.getTaskId(), workerClient);
}
@Override
@@ -295,26 +292,20 @@ public class BspServiceWorker<I extends WritableComparable,
*
* Use one or more threads to do the loading.
*
- * @param inputSplitPathList List of input split paths
* @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
* @return Statistics of the vertices and edges loaded
* @throws InterruptedException
* @throws KeeperException
*/
private VertexEdgeCount loadInputSplits(
- List<String> inputSplitPathList,
CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
throws KeeperException, InterruptedException {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
- // Determine how many threads to use based on the number of input splits
- int maxInputSplitThreads = (inputSplitPathList.size() - 1) /
- getConfiguration().getMaxWorkers() + 1;
- int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
- maxInputSplitThreads);
+ int numThreads = getConfiguration().getNumInputSplitsThreads();
if (LOG.isInfoEnabled()) {
LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
"originally " + getConfiguration().getNumInputSplitsThreads() +
- " threads(s) for " + inputSplitPathList.size() + " total splits.");
+ " threads(s)");
}
List<VertexEdgeCount> results =
@@ -336,46 +327,21 @@ public class BspServiceWorker<I extends WritableComparable,
*/
private long loadMapping() throws KeeperException,
InterruptedException {
- List<String> inputSplitPathList =
- getZkExt().getChildrenExt(mappingInputSplitsPaths.getPath(),
- false, false, true);
-
- InputSplitPathOrganizer splitOrganizer =
- new InputSplitPathOrganizer(getZkExt(),
- inputSplitPathList, getWorkerInfo().getHostname(),
- getConfiguration().useInputSplitLocality());
-
MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
- mappingInputSplitsCallableFactory =
+ inputSplitsCallableFactory =
new MappingInputSplitsCallableFactory<>(
getConfiguration().createWrappedMappingInputFormat(),
- splitOrganizer,
getContext(),
getConfiguration(),
this,
- getZkExt());
+ inputSplitsHandler);
- long entriesLoaded = 0;
- // Determine how many threads to use based on the number of input splits
- int maxInputSplitThreads = inputSplitPathList.size();
- int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
- maxInputSplitThreads);
- if (LOG.isInfoEnabled()) {
- LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
- "originally " + getConfiguration().getNumInputSplitsThreads() +
- " threads(s) for " + inputSplitPathList.size() + " total splits.");
- }
+ long mappingsLoaded =
+ loadInputSplits(inputSplitsCallableFactory).getMappingCount();
- List<Integer> results =
- ProgressableUtils.getResultsWithNCallables(
- mappingInputSplitsCallableFactory,
- numThreads, "load-mapping-%d", getContext());
- for (Integer result : results) {
- entriesLoaded += result;
- }
// after all threads finish loading - call postFilling
localData.getMappingStore().postFilling();
- return entriesLoaded;
+ return mappingsLoaded;
}
/**
@@ -386,31 +352,15 @@ public class BspServiceWorker<I extends WritableComparable,
*/
private VertexEdgeCount loadVertices() throws KeeperException,
InterruptedException {
- List<String> inputSplitPathList =
- getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
- false, false, true);
-
- InputSplitPathOrganizer splitOrganizer =
- new InputSplitPathOrganizer(getZkExt(),
- inputSplitPathList, getWorkerInfo().getHostname(),
- getConfiguration().useInputSplitLocality());
- vertexSplitsHandler = new InputSplitsHandler(
- splitOrganizer,
- getZkExt(),
- getContext(),
- BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
- BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
-
VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
new VertexInputSplitsCallableFactory<I, V, E>(
getConfiguration().createWrappedVertexInputFormat(),
getContext(),
getConfiguration(),
this,
- vertexSplitsHandler,
- getZkExt());
+ inputSplitsHandler);
- return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+ return loadInputSplits(inputSplitsCallableFactory);
}
/**
@@ -420,32 +370,15 @@ public class BspServiceWorker<I extends WritableComparable,
* @return Number of edges loaded
*/
private long loadEdges() throws KeeperException, InterruptedException {
- List<String> inputSplitPathList =
- getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
- false, false, true);
-
- InputSplitPathOrganizer splitOrganizer =
- new InputSplitPathOrganizer(getZkExt(),
- inputSplitPathList, getWorkerInfo().getHostname(),
- getConfiguration().useInputSplitLocality());
- edgeSplitsHandler = new InputSplitsHandler(
- splitOrganizer,
- getZkExt(),
- getContext(),
- BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
- BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
-
EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
new EdgeInputSplitsCallableFactory<I, V, E>(
getConfiguration().createWrappedEdgeInputFormat(),
getContext(),
getConfiguration(),
this,
- edgeSplitsHandler,
- getZkExt());
+ inputSplitsHandler);
- return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
- getEdgeCount();
+ return loadInputSplits(inputSplitsCallableFactory).getEdgeCount();
}
@Override
@@ -459,46 +392,12 @@ public class BspServiceWorker<I extends WritableComparable,
}
/**
- * Ensure the input splits are ready for processing
- *
- * @param inputSplitPaths Input split paths
- * @param inputSplitEvents Input split events
- */
- private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents) {
- while (true) {
- Stat inputSplitsReadyStat;
- try {
- inputSplitsReadyStat = getZkExt().exists(
- inputSplitPaths.getAllReadyPath(), true);
- } catch (KeeperException e) {
- throw new IllegalStateException("ensureInputSplitsReady: " +
- "KeeperException waiting on input splits", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("ensureInputSplitsReady: " +
- "InterruptedException waiting on input splits", e);
- }
- if (inputSplitsReadyStat != null) {
- break;
- }
- inputSplitEvents.getAllReadyChanged().waitForever();
- inputSplitEvents.getAllReadyChanged().reset();
- }
- }
-
- /**
* Mark current worker as done and then wait for all workers
* to finish processing input splits.
- *
- * @param inputSplitPaths Input split paths
- * @param inputSplitEvents Input split events
*/
- private void markCurrentWorkerDoneThenWaitForOthers(
- InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents) {
+ private void markCurrentWorkerDoneReadingThenWaitForOthers() {
String workerInputSplitsDonePath =
- inputSplitPaths.getDonePath() + "/" +
- getWorkerInfo().getHostnameId();
+ inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId();
try {
getZkExt().createExt(workerInputSplitsDonePath,
null,
@@ -508,32 +407,31 @@ public class BspServiceWorker<I extends WritableComparable,
} catch (KeeperException e) {
throw new IllegalStateException(
"markCurrentWorkerDoneThenWaitForOthers: " +
- "KeeperException creating worker done splits", e);
+ "KeeperException creating worker done splits", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"markCurrentWorkerDoneThenWaitForOthers: " +
- "InterruptedException creating worker done splits", e);
+ "InterruptedException creating worker done splits", e);
}
while (true) {
Stat inputSplitsDoneStat;
try {
inputSplitsDoneStat =
- getZkExt().exists(inputSplitPaths.getAllDonePath(),
- true);
+ getZkExt().exists(inputSplitsAllDonePath, true);
} catch (KeeperException e) {
throw new IllegalStateException(
"markCurrentWorkerDoneThenWaitForOthers: " +
- "KeeperException waiting on worker done splits", e);
+ "KeeperException waiting on worker done splits", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"markCurrentWorkerDoneThenWaitForOthers: " +
- "InterruptedException waiting on worker done splits", e);
+ "InterruptedException waiting on worker done splits", e);
}
if (inputSplitsDoneStat != null) {
break;
}
- inputSplitEvents.getAllDoneChanged().waitForever();
- inputSplitEvents.getAllDoneChanged().reset();
+ getInputSplitsAllDoneEvent().waitForever();
+ getInputSplitsAllDoneEvent().reset();
}
}
@@ -597,8 +495,6 @@ else[HADOOP_NON_SECURE]*/
long entriesLoaded;
if (getConfiguration().hasMappingInputFormat()) {
- // Ensure the mapping InputSplits are ready for processing
- ensureInputSplitsReady(mappingInputSplitsPaths, mappingInputSplitsEvents);
getContext().progress();
try {
entriesLoaded = loadMapping();
@@ -618,17 +514,12 @@ else[HADOOP_NON_SECURE]*/
entriesLoaded + " entries from inputSplits");
}
- // Workers wait for each other to finish, coordinated by master
- markCurrentWorkerDoneThenWaitForOthers(mappingInputSplitsPaths,
- mappingInputSplitsEvents);
// Print stats for data stored in localData once mapping is fully
// loaded on all the workers
localData.printStats();
}
if (getConfiguration().hasVertexInputFormat()) {
- // Ensure the vertex InputSplits are ready for processing
- ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
getContext().progress();
try {
vertexEdgeCount = loadVertices();
@@ -646,8 +537,6 @@ else[HADOOP_NON_SECURE]*/
WorkerProgress.get().finishLoadingVertices();
if (getConfiguration().hasEdgeInputFormat()) {
- // Ensure the edge InputSplits are ready for processing
- ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
getContext().progress();
try {
vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
@@ -666,17 +555,7 @@ else[HADOOP_NON_SECURE]*/
LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
}
- if (getConfiguration().hasVertexInputFormat()) {
- // Workers wait for each other to finish, coordinated by master
- markCurrentWorkerDoneThenWaitForOthers(vertexInputSplitsPaths,
- vertexInputSplitsEvents);
- }
-
- if (getConfiguration().hasEdgeInputFormat()) {
- // Workers wait for each other to finish, coordinated by master
- markCurrentWorkerDoneThenWaitForOthers(edgeInputSplitsPaths,
- edgeInputSplitsEvents);
- }
+ markCurrentWorkerDoneReadingThenWaitForOthers();
// Create remaining partitions owned by this worker.
for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
@@ -898,13 +777,6 @@ else[HADOOP_NON_SECURE]*/
if (getSuperstep() != INPUT_SUPERSTEP) {
postSuperstepCallbacks();
- } else {
- if (getConfiguration().hasVertexInputFormat()) {
- vertexSplitsHandler.setDoneReadingGraph(true);
- }
- if (getConfiguration().hasEdgeInputFormat()) {
- edgeSplitsHandler.setDoneReadingGraph(true);
- }
}
globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
@@ -1692,7 +1564,7 @@ else[HADOOP_NON_SECURE]*/
workerClient.setup(getConfiguration().authenticate());
/*end[HADOOP_NON_SECURE]*/
return new VertexEdgeCount(globalStats.getVertexCount(),
- globalStats.getEdgeCount());
+ globalStats.getEdgeCount(), 0);
} catch (IOException e) {
throw new RuntimeException(
@@ -1963,4 +1835,9 @@ else[HADOOP_NON_SECURE]*/
}
return count;
}
+
+ @Override
+ public WorkerInputSplitsHandler getInputSplitsHandler() {
+ return inputSplitsHandler;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 89f74b3..b7f1eb6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -26,9 +26,9 @@ import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.InputType;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -89,17 +89,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt Handle to ZooKeeperExt
*/
public EdgeInputSplitsCallable(
EdgeInputFormat<I, E> edgeInputFormat,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
- super(context, configuration, bspServiceWorker, splitsHandler,
- zooKeeperExt);
+ WorkerInputSplitsHandler splitsHandler) {
+ super(context, configuration, bspServiceWorker, splitsHandler);
this.edgeInputFormat = edgeInputFormat;
this.bspServiceWorker = bspServiceWorker;
@@ -126,6 +123,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
return edgeInputFormat;
}
+ @Override
+ public InputType getInputType() {
+ return InputType.EDGE;
+ }
+
/**
* Read edges from input split. If testing, the user may request a
* maximum number of edges to be read from an input split.
@@ -226,6 +228,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
WorkerProgress.get().incrementEdgeInputSplitsLoaded();
- return new VertexEdgeCount(0, inputSplitEdgesLoaded);
+ return new VertexEdgeCount(0, inputSplitEdgesLoaded, 0);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index f68ac93..d4bc1fc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -46,9 +45,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
/** {@link BspServiceWorker} we're running on. */
private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Handler for input splits */
- private final InputSplitsHandler splitsHandler;
- /** {@link ZooKeeperExt} for this worker. */
- private final ZooKeeperExt zooKeeperExt;
+ private final WorkerInputSplitsHandler splitsHandler;
/**
* Constructor.
@@ -58,20 +55,17 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker Calling {@link BspServiceWorker}
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt {@link ZooKeeperExt} for this worker
*/
public EdgeInputSplitsCallableFactory(
EdgeInputFormat<I, E> edgeInputFormat,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
+ WorkerInputSplitsHandler splitsHandler) {
this.edgeInputFormat = edgeInputFormat;
this.context = context;
this.configuration = configuration;
this.bspServiceWorker = bspServiceWorker;
- this.zooKeeperExt = zooKeeperExt;
this.splitsHandler = splitsHandler;
}
@@ -82,7 +76,6 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
context,
configuration,
bspServiceWorker,
- splitsHandler,
- zooKeeperExt);
+ splitsHandler);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
deleted file mode 100644
index 4e93ce0..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.io.GiraphInputFormat;
-import org.apache.giraph.time.SystemTime;
-import org.apache.giraph.time.Time;
-import org.apache.giraph.time.Times;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * InputSplitCallable to read all the splits
- *
- * @param <I> vertexId type
- * @param <V> vertexValue type
- * @param <E> edgeValue type
- */
-public abstract class FullInputSplitCallable<I extends WritableComparable,
- V extends Writable, E extends Writable>
- implements Callable<Integer> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(
- FullInputSplitCallable.class);
- /** Class time object */
- private static final Time TIME = SystemTime.get();
- /** Configuration */
- protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
- /** Context */
- protected final Mapper<?, ?, ?, ?>.Context context;
-
- /** The List of InputSplit znode paths */
- private final List<String> pathList;
- /** Current position in the path list */
- private final AtomicInteger currentIndex;
- /** ZooKeeperExt handle */
- private final ZooKeeperExt zooKeeperExt;
- /** Get the start time in nanos */
- private final long startNanos = TIME.getNanoseconds();
-
- // CHECKSTYLE: stop ParameterNumberCheck
- /**
- * Constructor.
-
- * @param splitOrganizer Input splits organizer
- * @param context Context
- * @param configuration Configuration
- * @param zooKeeperExt Handle to ZooKeeperExt
- * @param currentIndex Atomic Integer to get splitPath from list
- */
- public FullInputSplitCallable(InputSplitPathOrganizer splitOrganizer,
- Mapper<?, ?, ?, ?>.Context context,
- ImmutableClassesGiraphConfiguration<I, V, E> configuration,
- ZooKeeperExt zooKeeperExt,
- AtomicInteger currentIndex) {
- this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
- this.currentIndex = currentIndex;
- this.zooKeeperExt = zooKeeperExt;
- this.context = context;
- this.configuration = configuration;
- }
- // CHECKSTYLE: resume ParameterNumberCheck
-
- /**
- * Get input format
- *
- * @return Input format
- */
- public abstract GiraphInputFormat getInputFormat();
-
- /**
- * Load mapping entries from all the given input splits
- *
- * @param inputSplit Input split to load
- * @return Count of vertices and edges loaded
- * @throws java.io.IOException
- * @throws InterruptedException
- */
- protected abstract Integer readInputSplit(InputSplit inputSplit)
- throws IOException, InterruptedException;
-
- @Override
- public Integer call() {
- int entries = 0;
- String inputSplitPath;
- int inputSplitsProcessed = 0;
- try {
- while (true) {
- int pos = currentIndex.getAndIncrement();
- if (pos >= pathList.size()) {
- break;
- }
- inputSplitPath = pathList.get(pos);
- entries += loadInputSplit(inputSplitPath);
- context.progress();
- ++inputSplitsProcessed;
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException("call: InterruptedException", e);
- } catch (IOException e) {
- throw new IllegalStateException("call: IOException", e);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException("call: ClassNotFoundException", e);
- } catch (InstantiationException e) {
- throw new IllegalStateException("call: InstantiationException", e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("call: IllegalAccessException", e);
- }
-
- if (LOG.isInfoEnabled()) {
- float seconds = Times.getNanosSince(TIME, startNanos) /
- Time.NS_PER_SECOND_AS_FLOAT;
- float entriesPerSecond = entries / seconds;
- LOG.info("call: Loaded " + inputSplitsProcessed + " " +
- "input splits in " + seconds + " secs, " + entries +
- " " + entriesPerSecond + " entries/sec");
- }
- return entries;
- }
-
- /**
- * Extract entries from input split, saving them into mapping store.
- * Mark the input split finished when done.
- *
- * @param inputSplitPath ZK location of input split
- * @return Number of entries read in this input split
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- * @throws InstantiationException
- * @throws IllegalAccessException
- */
- private Integer loadInputSplit(
- String inputSplitPath)
- throws IOException, ClassNotFoundException, InterruptedException,
- InstantiationException, IllegalAccessException {
- InputSplit inputSplit = getInputSplit(inputSplitPath);
- Integer entriesRead = readInputSplit(inputSplit);
- if (LOG.isInfoEnabled()) {
- LOG.info("loadFromInputSplit: Finished loading " +
- inputSplitPath + " " + entriesRead);
- }
- return entriesRead;
- }
-
- /**
- * Talk to ZooKeeper to convert the input split path to the actual
- * InputSplit.
- *
- * @param inputSplitPath Location in ZK of input split
- * @return instance of InputSplit
- * @throws IOException
- * @throws ClassNotFoundException
- */
- protected InputSplit getInputSplit(String inputSplitPath)
- throws IOException, ClassNotFoundException {
- byte[] splitList;
- try {
- splitList = zooKeeperExt.getData(inputSplitPath, false, null);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "getInputSplit: KeeperException on " + inputSplitPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "getInputSplit: IllegalStateException on " + inputSplitPath, e);
- }
- context.progress();
-
- DataInputStream inputStream =
- new DataInputStream(new ByteArrayInputStream(splitList));
- InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("getInputSplit: Processing " + inputSplitPath +
- " from ZooKeeper and got input split '" +
- inputSplit.toString() + "'");
- }
- return inputSplit;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
deleted file mode 100644
index 463601c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.worker;
-
-import com.google.common.collect.Lists;
-
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Utility class to extract the list of InputSplits from the
- * ZooKeeper tree of "claimable splits" the master created,
- * and to sort the list to favor local data blocks.
- *
- * This class provides an Iterator for the list the worker will
- * claim splits from, making all sorting and data-code locality
- * processing done here invisible to callers. The aim is to cut
- * down on the number of ZK reads workers perform before locating
- * an unclaimed InputSplit.
- */
-public class InputSplitPathOrganizer {
- /** The worker's local ZooKeeperExt ref */
- private final ZooKeeperExt zooKeeper;
- /** The List of InputSplit znode paths */
- private final List<String> pathList;
- /** The worker's hostname */
- private final String hostName;
-
- /**
- * Constructor
- *
- * @param zooKeeper the worker's ZkExt
- * @param zkPathList the path to read from
- * @param hostName the worker's host name (for matching)
- * @param useLocality whether to prioritize local input splits
- */
- public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
- final String zkPathList, final String hostName,
- final boolean useLocality) throws KeeperException, InterruptedException {
- this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
- hostName, useLocality);
- }
-
- /**
- * Constructor
- *
- * @param zooKeeper the worker's ZkExt
- * @param inputSplitPathList path of input splits to read from
- * @param hostName the worker's host name (for matching)
- * @param useLocality whether to prioritize local input splits
- */
- public InputSplitPathOrganizer(
- final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
- final String hostName, final boolean useLocality) {
- this.zooKeeper = zooKeeper;
- this.pathList = Lists.newArrayList(inputSplitPathList);
- this.hostName = hostName;
- // Shuffle input splits in case several workers exist on this host
- Collections.shuffle(pathList);
- if (useLocality) {
- prioritizeLocalInputSplits();
- }
- }
-
- /**
- * Re-order list of InputSplits so files local to this worker node's
- * disk are the first it will iterate over when attempting to claim
- * a split to read. This will increase locality of data reads with greater
- * probability as the % of total nodes in the cluster hosting data and workers
- * BOTH increase towards 100%. Replication increases our chances of a "hit."
- */
- private void prioritizeLocalInputSplits() {
- List<String> sortedList = new ArrayList<String>();
- String hosts;
- for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
- final String path = iterator.next();
- try {
- hosts = getLocationsFromZkInputSplitData(path);
- } catch (IOException ioe) {
- hosts = null; // no problem, just don't sort this entry
- } catch (KeeperException ke) {
- hosts = null;
- } catch (InterruptedException ie) {
- hosts = null;
- }
- if (hosts != null && hosts.contains(hostName)) {
- sortedList.add(path); // collect the local block
- iterator.remove(); // remove local block from list
- }
- }
- pathList.addAll(0, sortedList);
- }
-
- /**
- * Utility for extracting locality data from an InputSplit ZNode.
- *
- * @param zkSplitPath the input split path to attempt to read
- * ZNode locality data from for this InputSplit.
- * @return a String of hostnames from ZNode data, or throws
- */
- private String getLocationsFromZkInputSplitData(String zkSplitPath)
- throws IOException, KeeperException, InterruptedException {
- byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
- DataInputStream inputStream =
- new DataInputStream(new ByteArrayInputStream(locationData));
- // only read the "first" entry in the znode data, the locations
- return Text.readString(inputStream);
- }
-
- /**
- * Get the ordered input splits paths.
- *
- * @return Ordered input splits paths
- */
- public Iterable<String> getPathList() {
- return pathList;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 7b2fc0f..92b23bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -28,6 +28,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.InputType;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MeterDesc;
@@ -35,14 +36,11 @@ import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
@@ -75,9 +73,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* Stores and processes the list of InputSplits advertised
* in a tree of child znodes by the master.
*/
- private final InputSplitsHandler splitsHandler;
- /** ZooKeeperExt handle */
- private final ZooKeeperExt zooKeeperExt;
+ private final WorkerInputSplitsHandler splitsHandler;
/** Get the start time in nanos */
private final long startNanos = TIME.getNanoseconds();
/** Whether to prioritize local input splits. */
@@ -91,15 +87,12 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt Handle to ZooKeeperExt
*/
public InputSplitsCallable(
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
- this.zooKeeperExt = zooKeeperExt;
+ WorkerInputSplitsHandler splitsHandler) {
this.context = context;
this.workerClientRequestProcessor =
new NettyWorkerClientRequestProcessor<I, V, E>(
@@ -119,6 +112,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
public abstract GiraphInputFormat getInputFormat();
/**
+ * Get input type
+ *
+ * @return Input type
+ */
+ public abstract InputType getInputType();
+
+ /**
* Get Meter tracking edges loaded
*
* @return Meter tracking edges loaded
@@ -205,27 +205,22 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
@Override
public VertexEdgeCount call() {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
- String inputSplitPath;
+ byte[] serializedInputSplit;
int inputSplitsProcessed = 0;
try {
- while ((inputSplitPath = splitsHandler.reserveInputSplit()) != null) {
- vertexEdgeCount =
- vertexEdgeCount.incrVertexEdgeCount(loadInputSplit(inputSplitPath));
+ while ((serializedInputSplit =
+ splitsHandler.reserveInputSplit(getInputType())) != null) {
+ vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+ loadInputSplit(serializedInputSplit));
context.progress();
++inputSplitsProcessed;
}
- } catch (KeeperException e) {
- throw new IllegalStateException("call: KeeperException", e);
} catch (InterruptedException e) {
throw new IllegalStateException("call: InterruptedException", e);
} catch (IOException e) {
throw new IllegalStateException("call: IOException", e);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("call: ClassNotFoundException", e);
- } catch (InstantiationException e) {
- throw new IllegalStateException("call: InstantiationException", e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("call: IllegalAccessException", e);
}
if (LOG.isInfoEnabled()) {
@@ -252,25 +247,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* reached in readVerticeFromInputSplit.
* Mark the input split finished when done.
*
- * @param inputSplitPath ZK location of input split
+ * @param serializedInputSplit Serialized input split
* @return Mapping of vertex indices and statistics, or null if no data read
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
- * @throws InstantiationException
- * @throws IllegalAccessException
*/
- private VertexEdgeCount loadInputSplit(
- String inputSplitPath)
- throws IOException, ClassNotFoundException, InterruptedException,
- InstantiationException, IllegalAccessException {
- InputSplit inputSplit = getInputSplit(inputSplitPath);
+ private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ InputSplit inputSplit = getInputSplit(serializedInputSplit);
VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
if (LOG.isInfoEnabled()) {
- LOG.info("loadFromInputSplit: Finished loading " +
- inputSplitPath + " " + vertexEdgeCount);
+ LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount);
}
- splitsHandler.markInputSplitPathFinished(inputSplitPath);
return vertexEdgeCount;
}
@@ -278,35 +267,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
* Talk to ZooKeeper to convert the input split path to the actual
* InputSplit.
*
- * @param inputSplitPath Location in ZK of input split
+ * @param serializedInputSplit Serialized input split
* @return instance of InputSplit
* @throws IOException
* @throws ClassNotFoundException
*/
- protected InputSplit getInputSplit(String inputSplitPath)
- throws IOException, ClassNotFoundException {
- byte[] splitList;
- try {
- splitList = zooKeeperExt.getData(inputSplitPath, false, null);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "getInputSplit: KeeperException on " + inputSplitPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "getInputSplit: IllegalStateException on " + inputSplitPath, e);
- }
- context.progress();
-
+ protected InputSplit getInputSplit(byte[] serializedInputSplit)
+ throws IOException, ClassNotFoundException {
DataInputStream inputStream =
- new DataInputStream(new ByteArrayInputStream(splitList));
- if (useLocality) {
- Text.readString(inputStream); // location data unused here, skip
- }
+ new DataInputStream(new ByteArrayInputStream(serializedInputSplit));
InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
if (LOG.isInfoEnabled()) {
- LOG.info("getInputSplit: Reserved " + inputSplitPath +
- " from ZooKeeper and got input split '" +
+ LOG.info("getInputSplit: Reserved input split '" +
inputSplit.toString() + "'");
}
return inputSplit;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
deleted file mode 100644
index e2099eb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Stores the list of input split paths, and provides thread-safe way for
- * reserving input splits.
- */
-public class InputSplitsHandler implements Watcher {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(InputSplitsHandler.class);
-
- /** The List of InputSplit znode paths */
- private final List<String> pathList;
- /** Current position in the path list */
- private final AtomicInteger currentIndex;
- /** The worker's local ZooKeeperExt ref */
- private final ZooKeeperExt zooKeeper;
- /** Context for reporting progress */
- private final Mapper<?, ?, ?, ?>.Context context;
- /** ZooKeeper input split reserved node. */
- private final String inputSplitReservedNode;
- /** ZooKeeper input split finished node. */
- private final String inputSplitFinishedNode;
- /** Specifies if we finished execution of INPUT_SUPERSTEP. The variable may
- * be accessed via different threads. */
- private volatile boolean doneReadingGraph;
-
- /**
- * Constructor
- *
- * @param splitOrganizer Input splits organizer
- * @param zooKeeper The worker's local ZooKeeperExt ref
- * @param context Context for reporting progress
- * @param inputSplitReservedNode ZooKeeper input split reserved node
- * @param inputSplitFinishedNode ZooKeeper input split finished node
- */
- public InputSplitsHandler(InputSplitPathOrganizer splitOrganizer,
- ZooKeeperExt zooKeeper, Mapper<?, ?, ?, ?>.Context context,
- String inputSplitReservedNode, String inputSplitFinishedNode) {
- this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
- this.currentIndex = new AtomicInteger(0);
- this.zooKeeper = zooKeeper;
- this.context = context;
- this.inputSplitReservedNode = inputSplitReservedNode;
- this.inputSplitFinishedNode = inputSplitFinishedNode;
- this.doneReadingGraph = false;
- }
-
- public void setDoneReadingGraph(boolean doneReadingGraph) {
- this.doneReadingGraph = doneReadingGraph;
- }
-
- /**
- * Try to reserve an InputSplit for loading. While InputSplits exists that
- * are not finished, wait until they are.
- *
- * NOTE: iterations on the InputSplit list only halt for each worker when it
- * has scanned the entire list once and found every split marked RESERVED.
- * When a worker fails, its Ephemeral RESERVED znodes will disappear,
- * allowing other iterating workers to claim it's previously read splits.
- * Only when the last worker left iterating on the list fails can a danger
- * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
- * causes job failure, this is OK. As the failure model evolves, this
- * behavior might need to change. We could add watches on
- * inputSplitFinishedNodes and stop iterating only when all these nodes
- * have been created.
- *
- * @return reserved InputSplit or null if no unfinished InputSplits exist
- * @throws KeeperException
- * @throws InterruptedException
- */
- public String reserveInputSplit() throws KeeperException,
- InterruptedException {
- String reservedInputSplitPath;
- Stat reservedStat;
- while (true) {
- int splitToTry = currentIndex.getAndIncrement();
- if (splitToTry >= pathList.size()) {
- return null;
- }
- String nextSplitToClaim = pathList.get(splitToTry);
- context.progress();
- String tmpInputSplitReservedPath =
- nextSplitToClaim + inputSplitReservedNode;
- reservedStat =
- zooKeeper.exists(tmpInputSplitReservedPath, this);
- if (reservedStat == null) {
- try {
- // Attempt to reserve this InputSplit
- zooKeeper.createExt(tmpInputSplitReservedPath,
- null,
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL,
- false);
- reservedInputSplitPath = nextSplitToClaim;
- if (LOG.isInfoEnabled()) {
- float percentFinished =
- splitToTry * 100.0f / pathList.size();
- LOG.info("reserveInputSplit: Reserved input " +
- "split path " + reservedInputSplitPath +
- ", overall roughly " +
- +percentFinished +
- "% input splits reserved");
- }
- return reservedInputSplitPath;
- } catch (KeeperException.NodeExistsException e) {
- LOG.info("reserveInputSplit: Couldn't reserve " +
- "(already reserved) inputSplit" +
- " at " + tmpInputSplitReservedPath);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "reserveInputSplit: KeeperException on reserve", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "reserveInputSplit: InterruptedException " +
- "on reserve", e);
- }
- }
- }
- }
-
- /**
- * Mark an input split path as completed by this worker. This notifies
- * the master and the other workers that this input split has not only
- * been reserved, but also marked processed.
- *
- * @param inputSplitPath Path to the input split.
- */
- public void markInputSplitPathFinished(String inputSplitPath) {
- String inputSplitFinishedPath =
- inputSplitPath + inputSplitFinishedNode;
- try {
- zooKeeper.createExt(inputSplitFinishedPath,
- null,
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException.NodeExistsException e) {
- LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
- " already exists!");
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "markInputSplitPathFinished: KeeperException on " +
- inputSplitFinishedPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "markInputSplitPathFinished: InterruptedException on " +
- inputSplitFinishedPath, e);
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
- if (event.getPath() == null) {
- LOG.warn("process: Problem with zookeeper, got event with path null, " +
- "state " + event.getState() + ", event type " + event.getType());
- return;
- }
- // Check if the reservation for the input split was lost in INPUT_SUPERSTEP
- // (some worker died). If INPUT_SUPERSTEP has already completed, we ignore
- // this event.
- if (event.getPath().endsWith(inputSplitReservedNode) &&
- event.getType() == Watcher.Event.EventType.NodeDeleted &&
- !doneReadingGraph) {
- synchronized (pathList) {
- String split = event.getPath();
- split = split.substring(0, split.indexOf(inputSplitReservedNode));
- pathList.add(split);
- if (LOG.isInfoEnabled()) {
- LOG.info("process: Input split " + split + " lost reservation");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
index f6dca25..5ab3ba9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -19,15 +19,15 @@
package org.apache.giraph.worker;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.MappingReader;
import org.apache.giraph.mapping.MappingEntry;
import org.apache.giraph.mapping.MappingStore;
-import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.io.InputType;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -46,7 +46,7 @@ import org.apache.hadoop.mapreduce.Mapper;
@SuppressWarnings("unchecked")
public class MappingInputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable, B extends Writable>
- extends FullInputSplitCallable<I, V, E> {
+ extends InputSplitsCallable<I, V, E> {
/** User supplied mappingInputFormat */
private final MappingInputFormat<I, V, E, B> mappingInputFormat;
/** Link to bspServiceWorker */
@@ -56,23 +56,18 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
* Constructor
*
* @param mappingInputFormat mappingInputFormat
- * @param splitOrganizer Input splits organizer
* @param context Context
* @param configuration Configuration
- * @param zooKeeperExt Handle to ZooKeeperExt
- * @param currentIndex Atomic Integer to get splitPath from list
* @param bspServiceWorker bsp service worker
+ * @param splitsHandler Splits handler
*/
public MappingInputSplitsCallable(
MappingInputFormat<I, V, E, B> mappingInputFormat,
- InputSplitPathOrganizer splitOrganizer,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
- ZooKeeperExt zooKeeperExt,
- AtomicInteger currentIndex,
- BspServiceWorker<I, V, E> bspServiceWorker) {
- super(splitOrganizer, context,
- configuration, zooKeeperExt, currentIndex);
+ BspServiceWorker<I, V, E> bspServiceWorker,
+ WorkerInputSplitsHandler splitsHandler) {
+ super(context, configuration, bspServiceWorker, splitsHandler);
this.mappingInputFormat = mappingInputFormat;
this.bspServiceWorker = bspServiceWorker;
}
@@ -83,7 +78,12 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
}
@Override
- protected Integer readInputSplit(InputSplit inputSplit)
+ public InputType getInputType() {
+ return InputType.MAPPING;
+ }
+
+ @Override
+ protected VertexEdgeCount readInputSplit(InputSplit inputSplit)
throws IOException, InterruptedException {
MappingReader<I, V, E, B> mappingReader =
mappingInputFormat.createMappingReader(inputSplit, context);
@@ -104,6 +104,6 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
entriesLoaded += 1;
mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
}
- return entriesLoaded;
+ return new VertexEdgeCount(0, 0, entriesLoaded);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
index 21a981e..6cf702a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
@@ -19,15 +19,13 @@
package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* Factory for {@link org.apache.giraph.worker.MappingInputSplitsCallable}s.
*
@@ -38,59 +36,47 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class MappingInputSplitsCallableFactory<I extends WritableComparable,
V extends Writable, E extends Writable, B extends Writable>
- implements CallableFactory<Integer> {
+ implements CallableFactory<VertexEdgeCount> {
/** Mapping input format */
private final MappingInputFormat<I, V, E, B> mappingInputFormat;
- /** Input split organizer */
- private final InputSplitPathOrganizer splitOrganizer;
/** Mapper context. */
private final Mapper<?, ?, ?, ?>.Context context;
/** Configuration. */
private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** {@link BspServiceWorker} we're running on. */
private final BspServiceWorker<I, V, E> bspServiceWorker;
- /** {@link ZooKeeperExt} for this worker. */
- private final ZooKeeperExt zooKeeperExt;
- /** Current position in the path list */
- private final AtomicInteger currentIndex;
-
+ /** Handler for input splits */
+ private final WorkerInputSplitsHandler splitsHandler;
/**
* Constructor.
*
* @param mappingInputFormat Mapping input format
- * @param splitOrganizer Input split organizer
* @param context Mapper context
* @param configuration Configuration
* @param bspServiceWorker Calling {@link BspServiceWorker}
- * @param zooKeeperExt {@link org.apache.giraph.zk.ZooKeeperExt}
- * for this worker
+ * @param splitsHandler Splits handler
*/
public MappingInputSplitsCallableFactory(
MappingInputFormat<I, V, E, B> mappingInputFormat,
- InputSplitPathOrganizer splitOrganizer,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- ZooKeeperExt zooKeeperExt) {
+ WorkerInputSplitsHandler splitsHandler) {
this.mappingInputFormat = mappingInputFormat;
- this.splitOrganizer = splitOrganizer;
this.context = context;
this.configuration = configuration;
this.bspServiceWorker = bspServiceWorker;
- this.zooKeeperExt = zooKeeperExt;
- this.currentIndex = new AtomicInteger(0);
+ this.splitsHandler = splitsHandler;
}
@Override
- public FullInputSplitCallable<I, V, E> newCallable(int threadId) {
+ public InputSplitsCallable<I, V, E> newCallable(int threadId) {
return new MappingInputSplitsCallable<>(
mappingInputFormat,
- splitOrganizer,
context,
configuration,
- zooKeeperExt,
- currentIndex,
- bspServiceWorker);
+ bspServiceWorker,
+ splitsHandler);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 00a2781..540a6b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -30,10 +30,10 @@ import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.mapping.translate.TranslateEdge;
+import org.apache.giraph.io.InputType;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -99,17 +99,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker service worker
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt Handle to ZooKeeperExt
*/
public VertexInputSplitsCallable(
VertexInputFormat<I, V, E> vertexInputFormat,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
- super(context, configuration, bspServiceWorker, splitsHandler,
- zooKeeperExt);
+ WorkerInputSplitsHandler splitsHandler) {
+ super(context, configuration, bspServiceWorker, splitsHandler);
this.vertexInputFormat = vertexInputFormat;
inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
@@ -136,6 +133,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
return vertexInputFormat;
}
+ @Override
+ public InputType getInputType() {
+ return InputType.VERTEX;
+ }
+
/**
* Read vertices from input split. If testing, the user may request a
* maximum number of vertices to be read from an input split.
@@ -274,7 +276,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
WorkerProgress.get().incrementVertexInputSplitsLoaded();
return new VertexEdgeCount(inputSplitVerticesLoaded,
- inputSplitEdgesLoaded + edgesSinceLastUpdate);
+ inputSplitEdgesLoaded + edgesSinceLastUpdate, 0);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index c9893d2..7aef3a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -46,9 +45,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
/** {@link BspServiceWorker} we're running on. */
private final BspServiceWorker<I, V, E> bspServiceWorker;
/** Handler for input splits */
- private final InputSplitsHandler splitsHandler;
- /** {@link ZooKeeperExt} for this worker. */
- private final ZooKeeperExt zooKeeperExt;
+ private final WorkerInputSplitsHandler splitsHandler;
/**
* Constructor.
@@ -58,20 +55,17 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
* @param configuration Configuration
* @param bspServiceWorker Calling {@link BspServiceWorker}
* @param splitsHandler Handler for input splits
- * @param zooKeeperExt {@link ZooKeeperExt} for this worker
*/
public VertexInputSplitsCallableFactory(
VertexInputFormat<I, V, E> vertexInputFormat,
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
BspServiceWorker<I, V, E> bspServiceWorker,
- InputSplitsHandler splitsHandler,
- ZooKeeperExt zooKeeperExt) {
+ WorkerInputSplitsHandler splitsHandler) {
this.vertexInputFormat = vertexInputFormat;
this.context = context;
this.configuration = configuration;
this.bspServiceWorker = bspServiceWorker;
- this.zooKeeperExt = zooKeeperExt;
this.splitsHandler = splitsHandler;
}
@@ -82,7 +76,6 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
context,
configuration,
bspServiceWorker,
- splitsHandler,
- zooKeeperExt);
+ splitsHandler);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
new file mode 100644
index 0000000..0dc42b3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.requests.AskForInputSplitRequest;
+import org.apache.giraph.io.InputType;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Requests splits from master and keeps track of them
+ */
+public class WorkerInputSplitsHandler {
+ /** Worker info of this worker */
+ private final WorkerInfo workerInfo;
+ /** Task id of master */
+ private final int masterTaskId;
+ /** Worker client, used for communication */
+ private final WorkerClient workerClient;
+ /** Map with currently available splits received from master */
+ private final Map<InputType, BlockingQueue<byte[]>> availableInputSplits;
+
+ /**
+ * Constructor
+ *
+ * @param workerInfo Worker info of this worker
+ * @param masterTaskId Task id of master
+ * @param workerClient Worker client, used for communication
+ */
+ public WorkerInputSplitsHandler(WorkerInfo workerInfo, int masterTaskId,
+ WorkerClient workerClient) {
+ this.workerInfo = workerInfo;
+ this.masterTaskId = masterTaskId;
+ this.workerClient = workerClient;
+ availableInputSplits = new EnumMap<>(InputType.class);
+ for (InputType inputType : InputType.values()) {
+ availableInputSplits.put(
+ inputType, new LinkedBlockingQueue<byte[]>());
+ }
+ }
+
+ /**
+ * Called when an input split has been received from master, adding it to
+ * the map
+ *
+ * @param splitType Type of split
+ * @param serializedInputSplit Split
+ */
+ public void receivedInputSplit(InputType splitType,
+ byte[] serializedInputSplit) {
+ try {
+ availableInputSplits.get(splitType).put(serializedInputSplit);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Interrupted", e);
+ }
+ }
+
+ /**
+ * Try to reserve an InputSplit for loading. While InputSplits exists that
+ * are not finished, wait until they are.
+ *
+ * NOTE: iterations on the InputSplit list only halt for each worker when it
+ * has scanned the entire list once and found every split marked RESERVED.
+ * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+ * allowing other iterating workers to claim it's previously read splits.
+ * Only when the last worker left iterating on the list fails can a danger
+ * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+ * causes job failure, this is OK. As the failure model evolves, this
+ * behavior might need to change. We could add watches on
+ * inputSplitFinishedNodes and stop iterating only when all these nodes
+ * have been created.
+ *
+ * @param splitType Type of split
+ * @return reserved InputSplit or null if no unfinished InputSplits exist
+ */
+ public byte[] reserveInputSplit(InputType splitType) {
+ // Send request
+ workerClient.sendWritableRequest(masterTaskId,
+ new AskForInputSplitRequest(splitType, workerInfo.getTaskId()));
+ try {
+ // Wait for some split to become available
+ byte[] serializedInputSplit = availableInputSplits.get(splitType).take();
+ return serializedInputSplit.length == 0 ? null : serializedInputSplit;
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Interrupted", e);
+ }
+ }
+}
[3/3] git commit: updated refs/heads/trunk to 5b0cd0e
Posted by ma...@apache.org.
GIRAPH-1033: Remove zookeeper from input splits handling
Summary: Currently we use zookeeper for handling input splits, by having each worker checking each split, and when a lot of splits are used this becomes very slow. We should have master coordinate input splits allocation instead, making the complexity proportional to #splits instead of #workers*#splits. Master holds all the splits and worker send requests to him asking for splits when they need them.
Test Plan: Run a job with 200 machines and 200k small splits - without this change input superstep takes 30 minutes, and with it less than 2 minutes. Also verified correctness on sample job. mvn clean verify passes.
Differential Revision: https://reviews.facebook.net/D48531
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5b0cd0e0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5b0cd0e0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5b0cd0e0
Branch: refs/heads/trunk
Commit: 5b0cd0e0a2ddbf722b6140d28474295c8376e561
Parents: 47da751
Author: Maja Kabiljo <ma...@fb.com>
Authored: Mon Oct 12 10:56:39 2015 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Oct 19 10:13:43 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/giraph/bsp/BspService.java | 334 +++----------------
.../giraph/bsp/CentralizedServiceMaster.java | 4 +-
.../giraph/bsp/CentralizedServiceWorker.java | 8 +
.../org/apache/giraph/comm/MasterClient.java | 9 +
.../giraph/comm/netty/NettyMasterClient.java | 6 +
.../handler/MasterRequestServerHandler.java | 22 +-
.../comm/requests/AskForInputSplitRequest.java | 76 +++++
.../giraph/comm/requests/MasterRequest.java | 6 +-
.../requests/ReplyWithInputSplitRequest.java | 81 +++++
.../giraph/comm/requests/RequestType.java | 6 +-
.../requests/SendReducedToMasterRequest.java | 6 +-
.../giraph/graph/FinishedSuperstepStats.java | 2 +-
.../apache/giraph/graph/InputSplitEvents.java | 85 -----
.../apache/giraph/graph/InputSplitPaths.java | 88 -----
.../apache/giraph/graph/VertexEdgeCount.java | 20 +-
.../java/org/apache/giraph/io/InputType.java | 31 ++
.../apache/giraph/master/BspServiceMaster.java | 281 +++-------------
.../giraph/master/MasterAggregatorHandler.java | 2 +-
.../giraph/master/MasterGlobalCommHandler.java | 76 +++++
.../giraph/master/MasterGlobalCommUsage.java | 49 +--
.../MasterGlobalCommUsageAggregators.java | 69 ++++
.../input/BasicInputSplitsMasterOrganizer.java | 46 +++
.../input/InputSplitsMasterOrganizer.java | 32 ++
...LocalityAwareInputSplitsMasterOrganizer.java | 125 +++++++
.../MappingInputSplitsMasterOrganizer.java | 64 ++++
.../master/input/MasterInputSplitsHandler.java | 140 ++++++++
.../giraph/master/input/package-info.java | 21 ++
.../apache/giraph/partition/PartitionUtils.java | 2 +-
.../apache/giraph/worker/BspServiceWorker.java | 187 ++---------
.../giraph/worker/EdgeInputSplitsCallable.java | 16 +-
.../worker/EdgeInputSplitsCallableFactory.java | 13 +-
.../giraph/worker/FullInputSplitCallable.java | 210 ------------
.../giraph/worker/InputSplitPathOrganizer.java | 142 --------
.../giraph/worker/InputSplitsCallable.java | 77 ++---
.../giraph/worker/InputSplitsHandler.java | 208 ------------
.../worker/MappingInputSplitsCallable.java | 28 +-
.../MappingInputSplitsCallableFactory.java | 34 +-
.../worker/VertexInputSplitsCallable.java | 16 +-
.../VertexInputSplitsCallableFactory.java | 13 +-
.../giraph/worker/WorkerInputSplitsHandler.java | 108 ++++++
.../java/org/apache/giraph/TestBspBasic.java | 69 ++--
41 files changed, 1164 insertions(+), 1648 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 0a5a7ba..15e4dbe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -21,8 +21,6 @@ package org.apache.giraph.bsp;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.utils.CheckpointingUtils;
@@ -77,59 +75,13 @@ public abstract class BspService<I extends WritableComparable,
/** Master job state znode above base dir */
public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
- /** Mapping input split directory about base dir */
- public static final String MAPPING_INPUT_SPLIT_DIR = "/_mappingInputSplitDir";
- /** Mapping input split done directory about base dir */
- public static final String MAPPING_INPUT_SPLIT_DONE_DIR =
- "/_mappingInputSplitDoneDir";
- /** Denotes a reserved mapping input split */
- public static final String MAPPING_INPUT_SPLIT_RESERVED_NODE =
- "/_mappingInputSplitReserved";
- /** Denotes a finished mapping input split */
- public static final String MAPPING_INPUT_SPLIT_FINISHED_NODE =
- "/_mappingInputSplitFinished";
- /** Denotes that all the mapping input splits are are ready for consumption */
- public static final String MAPPING_INPUT_SPLITS_ALL_READY_NODE =
- "/_mappingInputSplitsAllReady";
- /** Denotes that all the mapping input splits are done. */
- public static final String MAPPING_INPUT_SPLITS_ALL_DONE_NODE =
- "/_mappingInputSplitsAllDone";
-
- /** Vertex input split directory about base dir */
- public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir";
- /** Vertex input split done directory about base dir */
- public static final String VERTEX_INPUT_SPLIT_DONE_DIR =
- "/_vertexInputSplitDoneDir";
- /** Denotes a reserved vertex input split */
- public static final String VERTEX_INPUT_SPLIT_RESERVED_NODE =
- "/_vertexInputSplitReserved";
- /** Denotes a finished vertex input split */
- public static final String VERTEX_INPUT_SPLIT_FINISHED_NODE =
- "/_vertexInputSplitFinished";
- /** Denotes that all the vertex input splits are are ready for consumption */
- public static final String VERTEX_INPUT_SPLITS_ALL_READY_NODE =
- "/_vertexInputSplitsAllReady";
- /** Denotes that all the vertex input splits are done. */
- public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE =
- "/_vertexInputSplitsAllDone";
-
- /** Edge input split directory about base dir */
- public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir";
- /** Edge input split done directory about base dir */
- public static final String EDGE_INPUT_SPLIT_DONE_DIR =
- "/_edgeInputSplitDoneDir";
- /** Denotes a reserved edge input split */
- public static final String EDGE_INPUT_SPLIT_RESERVED_NODE =
- "/_edgeInputSplitReserved";
- /** Denotes a finished edge input split */
- public static final String EDGE_INPUT_SPLIT_FINISHED_NODE =
- "/_edgeInputSplitFinished";
- /** Denotes that all the edge input splits are are ready for consumption */
- public static final String EDGE_INPUT_SPLITS_ALL_READY_NODE =
- "/_edgeInputSplitsAllReady";
- /** Denotes that all the edge input splits are done. */
- public static final String EDGE_INPUT_SPLITS_ALL_DONE_NODE =
- "/_edgeInputSplitsAllDone";
+ /** Input splits worker done directory */
+ public static final String INPUT_SPLITS_WORKER_DONE_DIR =
+ "/_inputSplitsWorkerDoneDir";
+ /** Input splits all done node*/
+ public static final String INPUT_SPLITS_ALL_DONE_NODE =
+ "/_inputSplitsAllDone";
+
/** Directory of attempts of this application */
public static final String APPLICATION_ATTEMPTS_DIR =
"/_applicationAttemptsDir";
@@ -192,18 +144,10 @@ public abstract class BspService<I extends WritableComparable,
protected final String basePath;
/** Path to the job state determined by the master (informative only) */
protected final String masterJobStatePath;
- /** ZooKeeper paths for mapping input splits. */
- protected final InputSplitPaths mappingInputSplitsPaths;
- /** ZooKeeper paths for vertex input splits. */
- protected final InputSplitPaths vertexInputSplitsPaths;
- /** ZooKeeper paths for edge input splits. */
- protected final InputSplitPaths edgeInputSplitsPaths;
- /** Mapping input splits events */
- protected final InputSplitEvents mappingInputSplitsEvents;
- /** Vertex input split events. */
- protected final InputSplitEvents vertexInputSplitsEvents;
- /** Edge input split events. */
- protected final InputSplitEvents edgeInputSplitsEvents;
+ /** Input splits worker done directory */
+ protected final String inputSplitsWorkerDonePath;
+ /** Input splits all done node */
+ protected final String inputSplitsAllDonePath;
/** Path to the application attempts) */
protected final String applicationAttemptsPath;
/** Path to the cleaned up notifications */
@@ -226,6 +170,10 @@ public abstract class BspService<I extends WritableComparable,
private final BspEvent addressesAndPartitionsReadyChanged;
/** Application attempt changed */
private final BspEvent applicationAttemptChanged;
+ /** Input splits worker done */
+ private final BspEvent inputSplitsWorkerDoneEvent;
+ /** Input splits all done */
+ private final BspEvent inputSplitsAllDoneEvent;
/** Superstep finished synchronization */
private final BspEvent superstepFinished;
/** Master election changed for any waited on attempt */
@@ -269,23 +217,20 @@ public abstract class BspService<I extends WritableComparable,
public BspService(
Mapper<?, ?, ?, ?>.Context context,
GraphTaskManager<I, V, E> graphTaskManager) {
- this.mappingInputSplitsEvents = new InputSplitEvents(context);
- this.vertexInputSplitsEvents = new InputSplitEvents(context);
- this.edgeInputSplitsEvents = new InputSplitEvents(context);
this.connectedEvent = new PredicateLock(context);
this.workerHealthRegistrationChanged = new PredicateLock(context);
this.addressesAndPartitionsReadyChanged = new PredicateLock(context);
this.applicationAttemptChanged = new PredicateLock(context);
+ this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
+ this.inputSplitsAllDoneEvent = new PredicateLock(context);
this.superstepFinished = new PredicateLock(context);
this.masterElectionChildrenChanged = new PredicateLock(context);
this.cleanedUpChildrenChanged = new PredicateLock(context);
registerBspEvent(connectedEvent);
registerBspEvent(workerHealthRegistrationChanged);
- registerBspEvent(vertexInputSplitsEvents.getAllReadyChanged());
- registerBspEvent(vertexInputSplitsEvents.getStateChanged());
- registerBspEvent(edgeInputSplitsEvents.getAllReadyChanged());
- registerBspEvent(edgeInputSplitsEvents.getStateChanged());
+ registerBspEvent(inputSplitsWorkerDoneEvent);
+ registerBspEvent(inputSplitsAllDoneEvent);
registerBspEvent(addressesAndPartitionsReadyChanged);
registerBspEvent(applicationAttemptChanged);
registerBspEvent(superstepFinished);
@@ -311,16 +256,8 @@ public abstract class BspService<I extends WritableComparable,
getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
basePath);
masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
- mappingInputSplitsPaths = new InputSplitPaths(basePath,
- MAPPING_INPUT_SPLIT_DIR, MAPPING_INPUT_SPLIT_DONE_DIR,
- MAPPING_INPUT_SPLITS_ALL_READY_NODE,
- MAPPING_INPUT_SPLITS_ALL_DONE_NODE);
- vertexInputSplitsPaths = new InputSplitPaths(basePath,
- VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
- VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE);
- edgeInputSplitsPaths = new InputSplitPaths(basePath,
- EDGE_INPUT_SPLIT_DIR, EDGE_INPUT_SPLIT_DONE_DIR,
- EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
+ inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
+ inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
@@ -433,24 +370,6 @@ public abstract class BspService<I extends WritableComparable,
}
/**
- * Get the input split events for edge input.
- *
- * @return InputSplitEvents for edge input.
- */
- public InputSplitEvents getEdgeInputSplitsEvents() {
- return edgeInputSplitsEvents;
- }
-
- /**
- * Get the input split events for vertex input.
- *
- * @return InputSplitEvents for vertex input.
- */
- public InputSplitEvents getVertexInputSplitsEvents() {
- return vertexInputSplitsEvents;
- }
-
- /**
* Generate the worker information "healthy" directory path for a
* superstep
*
@@ -655,6 +574,14 @@ public abstract class BspService<I extends WritableComparable,
return applicationAttemptChanged;
}
+ public final BspEvent getInputSplitsWorkerDoneEvent() {
+ return inputSplitsWorkerDoneEvent;
+ }
+
+ public final BspEvent getInputSplitsAllDoneEvent() {
+ return inputSplitsAllDoneEvent;
+ }
+
public final BspEvent getSuperstepFinishedEvent() {
return superstepFinished;
}
@@ -952,9 +879,20 @@ public abstract class BspService<I extends WritableComparable,
}
workerHealthRegistrationChanged.signal();
eventProcessed = true;
- } else if (processMappingEvent(event) || processVertexEvent(event) ||
- processEdgeEvent(event)) {
- return;
+ } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) &&
+ event.getType() == EventType.NodeCreated) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: all input splits done");
+ }
+ inputSplitsAllDoneEvent.signal();
+ eventProcessed = true;
+ } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) &&
+ event.getType() == EventType.NodeChildrenChanged) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("process: worker done reading input splits");
+ }
+ inputSplitsWorkerDoneEvent.signal();
+ eventProcessed = true;
} else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
event.getType() == EventType.NodeCreated) {
if (LOG.isInfoEnabled()) {
@@ -1001,192 +939,6 @@ public abstract class BspService<I extends WritableComparable,
}
/**
- * Process WatchedEvent for Mapping Inputsplits
- *
- * @param event watched event
- * @return true if event processed
- */
- public final boolean processMappingEvent(WatchedEvent event) {
- boolean eventProcessed = false;
- if (event.getPath().equals(
- mappingInputSplitsPaths.getAllReadyPath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: inputSplitsReadyChanged " +
- "(input splits ready)");
- }
- mappingInputSplitsEvents.getAllReadyChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: mappingInputSplitsStateChanged " +
- "(made a reservation)");
- }
- mappingInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeDeleted)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: mappingInputSplitsStateChanged " +
- "(lost a reservation)");
- }
- mappingInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_FINISHED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: mappingInputSplitsStateChanged " +
- "(finished inputsplit)");
- }
- mappingInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_DONE_DIR) &&
- (event.getType() == EventType.NodeChildrenChanged)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: mappingInputSplitsDoneStateChanged " +
- "(worker finished sending)");
- }
- mappingInputSplitsEvents.getDoneStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().equals(
- mappingInputSplitsPaths.getAllDonePath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: mappingInputSplitsAllDoneChanged " +
- "(all entries sent from input splits)");
- }
- mappingInputSplitsEvents.getAllDoneChanged().signal();
- eventProcessed = true;
- }
- return eventProcessed;
- }
-
- /**
- * Process WatchedEvent for Vertex Inputsplits
- *
- * @param event watched event
- * @return true if event processed
- */
- public final boolean processVertexEvent(WatchedEvent event) {
- boolean eventProcessed = false;
- if (event.getPath().equals(
- vertexInputSplitsPaths.getAllReadyPath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: inputSplitsReadyChanged " +
- "(input splits ready)");
- }
- vertexInputSplitsEvents.getAllReadyChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: vertexInputSplitsStateChanged " +
- "(made a reservation)");
- }
- vertexInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeDeleted)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: vertexInputSplitsStateChanged " +
- "(lost a reservation)");
- }
- vertexInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_FINISHED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: vertexInputSplitsStateChanged " +
- "(finished inputsplit)");
- }
- vertexInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_DONE_DIR) &&
- (event.getType() == EventType.NodeChildrenChanged)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: vertexInputSplitsDoneStateChanged " +
- "(worker finished sending)");
- }
- vertexInputSplitsEvents.getDoneStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().equals(
- vertexInputSplitsPaths.getAllDonePath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: vertexInputSplitsAllDoneChanged " +
- "(all vertices sent from input splits)");
- }
- vertexInputSplitsEvents.getAllDoneChanged().signal();
- eventProcessed = true;
- }
- return eventProcessed;
- }
-
- /**
- * Process WatchedEvent for Edge Inputsplits
- *
- * @param event watched event
- * @return true if event processed
- */
- public final boolean processEdgeEvent(WatchedEvent event) {
- boolean eventProcessed = false;
- if (event.getPath().equals(
- edgeInputSplitsPaths.getAllReadyPath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: edgeInputSplitsReadyChanged " +
- "(input splits ready)");
- }
- edgeInputSplitsEvents.getAllReadyChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: edgeInputSplitsStateChanged " +
- "(made a reservation)");
- }
- edgeInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
- (event.getType() == EventType.NodeDeleted)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: edgeInputSplitsStateChanged " +
- "(lost a reservation)");
- }
- edgeInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_FINISHED_NODE) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: edgeInputSplitsStateChanged " +
- "(finished inputsplit)");
- }
- edgeInputSplitsEvents.getStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_DONE_DIR) &&
- (event.getType() == EventType.NodeChildrenChanged)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("process: edgeInputSplitsDoneStateChanged " +
- "(worker finished sending)");
- }
- edgeInputSplitsEvents.getDoneStateChanged().signal();
- eventProcessed = true;
- } else if (event.getPath().equals(
- edgeInputSplitsPaths.getAllDonePath()) &&
- (event.getType() == EventType.NodeCreated)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: edgeInputSplitsAllDoneChanged " +
- "(all edges sent from input splits)");
- }
- edgeInputSplitsEvents.getAllDoneChanged().signal();
- eventProcessed = true;
- }
- return eventProcessed;
- }
-
- /**
* Get the last saved superstep.
*
* @return Last good superstep number
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 1e8d519..f05a79d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -22,8 +22,8 @@ import java.io.IOException;
import java.util.List;
import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
-import org.apache.giraph.master.MasterAggregatorHandler;
import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterGlobalCommHandler;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
@@ -144,7 +144,7 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
*
* @return Global communication handler
*/
- MasterAggregatorHandler getGlobalCommHandler();
+ MasterGlobalCommHandler getGlobalCommHandler();
/**
* Handler for aggregators to reduce/broadcast translation
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index f6d77d0..94cd265 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -30,6 +30,7 @@ import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.worker.WorkerInputSplitsHandler;
import org.apache.giraph.worker.WorkerAggregatorHandler;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerInfo;
@@ -252,4 +253,11 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
* @return number of partitions owned
*/
int getNumPartitionsOwned();
+
+ /**
+ * Get input splits handler used during input
+ *
+ * @return Input splits handler
+ */
+ WorkerInputSplitsHandler getInputSplitsHandler();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
index aea93fd..244dd74 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm;
import java.io.IOException;
+import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.hadoop.io.Writable;
/**
@@ -54,6 +55,14 @@ public interface MasterClient {
void flush();
/**
+ * Send a request to a remote server (should be already connected)
+ *
+ * @param destTaskId Destination worker id
+ * @param request Request to send
+ */
+ void sendWritableRequest(int destTaskId, WritableRequest request);
+
+ /**
* Closes all connections.
*/
void closeConnections();
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index e110782..9b348e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -26,6 +26,7 @@ import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
@@ -117,6 +118,11 @@ public class NettyMasterClient implements MasterClient {
}
@Override
+ public void sendWritableRequest(int destTaskId, WritableRequest request) {
+ nettyClient.sendWritableRequest(destTaskId, request);
+ }
+
+ @Override
public void closeConnections() {
nettyClient.stop();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index 02c72f7..9aa88ae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -21,13 +21,13 @@ package org.apache.giraph.comm.netty.handler;
import org.apache.giraph.comm.requests.MasterRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
/** Handler for requests on master */
public class MasterRequestServerHandler extends
RequestServerHandler<MasterRequest> {
/** Aggregator handler */
- private final MasterAggregatorHandler aggregatorHandler;
+ private final MasterGlobalCommHandler commHandler;
/**
* Constructor
@@ -35,22 +35,22 @@ public class MasterRequestServerHandler extends
* @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration
* @param myTaskInfo Current task info
- * @param aggregatorHandler Master aggregator handler
+ * @param commHandler Master communication handler
* @param exceptionHandler Handles uncaught exceptions
*/
public MasterRequestServerHandler(
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
TaskInfo myTaskInfo,
- MasterAggregatorHandler aggregatorHandler,
+ MasterGlobalCommHandler commHandler,
Thread.UncaughtExceptionHandler exceptionHandler) {
super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
- this.aggregatorHandler = aggregatorHandler;
+ this.commHandler = commHandler;
}
@Override
public void processRequest(MasterRequest request) {
- request.doRequest(aggregatorHandler);
+ request.doRequest(commHandler);
}
/**
@@ -58,15 +58,15 @@ public class MasterRequestServerHandler extends
*/
public static class Factory implements RequestServerHandler.Factory {
/** Master aggregator handler */
- private final MasterAggregatorHandler aggregatorHandler;
+ private final MasterGlobalCommHandler commHandler;
/**
* Constructor
*
- * @param aggregatorHandler Master aggregator handler
+ * @param commHandler Master global communication handler
*/
- public Factory(MasterAggregatorHandler aggregatorHandler) {
- this.aggregatorHandler = aggregatorHandler;
+ public Factory(MasterGlobalCommHandler commHandler) {
+ this.commHandler = commHandler;
}
@Override
@@ -76,7 +76,7 @@ public class MasterRequestServerHandler extends
TaskInfo myTaskInfo,
Thread.UncaughtExceptionHandler exceptionHandler) {
return new MasterRequestServerHandler(workerRequestReservedMap, conf,
- myTaskInfo, aggregatorHandler, exceptionHandler);
+ myTaskInfo, commHandler, exceptionHandler);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
new file mode 100644
index 0000000..5d9e4e6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.master.MasterGlobalCommHandler;
+import org.apache.giraph.io.InputType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A request which workers will send to master to ask it to give them splits
+ */
+public class AskForInputSplitRequest extends WritableRequest
+ implements MasterRequest {
+ /** Type of split we are requesting */
+ private InputType splitType;
+ /** Task id of worker which requested the split */
+ private int workerTaskId;
+
+ /**
+ * Constructor
+ *
+ * @param splitType Type of split we are requesting
+ * @param workerTaskId Task id of worker which requested the split
+ */
+ public AskForInputSplitRequest(InputType splitType, int workerTaskId) {
+ this.splitType = splitType;
+ this.workerTaskId = workerTaskId;
+ }
+
+ /**
+ * Constructor used for reflection only
+ */
+ public AskForInputSplitRequest() {
+ }
+
+ @Override
+ public void doRequest(MasterGlobalCommHandler commHandler) {
+ commHandler.getInputSplitsHandler().sendSplitTo(splitType, workerTaskId);
+ }
+
+ @Override
+ void readFieldsRequest(DataInput in) throws IOException {
+ splitType = InputType.values()[in.readInt()];
+ workerTaskId = in.readInt();
+ }
+
+ @Override
+ void writeRequest(DataOutput out) throws IOException {
+ out.writeInt(splitType.ordinal());
+ out.writeInt(workerTaskId);
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.ASK_FOR_INPUT_SPLIT_REQUEST;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
index 7fedcc5..43632b0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
@@ -18,7 +18,7 @@
package org.apache.giraph.comm.requests;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
/**
* Interface for requests sent to master to extend
@@ -27,7 +27,7 @@ public interface MasterRequest {
/**
* Execute the request
*
- * @param aggregatorHandler Master aggregator handler
+ * @param commHandler Master communication handler
*/
- void doRequest(MasterAggregatorHandler aggregatorHandler);
+ void doRequest(MasterGlobalCommHandler commHandler);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
new file mode 100644
index 0000000..6b50562
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.io.InputType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A request which master will send to workers to give them splits
+ */
+public class ReplyWithInputSplitRequest extends WritableRequest
+ implements WorkerRequest {
+ /** Type of input split */
+ private InputType splitType;
+ /** Serialized input split */
+ private byte[] serializedInputSplit;
+
+ /**
+ * Constructor
+ *
+ * @param splitType Type of input split
+ * @param serializedInputSplit Serialized input split
+ */
+ public ReplyWithInputSplitRequest(InputType splitType,
+ byte[] serializedInputSplit) {
+ this.splitType = splitType;
+ this.serializedInputSplit = serializedInputSplit;
+ }
+
+ /**
+ * Constructor used for reflection only
+ */
+ public ReplyWithInputSplitRequest() {
+ }
+
+ @Override
+ void readFieldsRequest(DataInput in) throws IOException {
+ splitType = InputType.values()[in.readInt()];
+ int size = in.readInt();
+ serializedInputSplit = new byte[size];
+ in.readFully(serializedInputSplit);
+ }
+
+ @Override
+ void writeRequest(DataOutput out) throws IOException {
+ out.writeInt(splitType.ordinal());
+ out.writeInt(serializedInputSplit.length);
+ out.write(serializedInputSplit);
+ }
+
+ @Override
+ public void doRequest(ServerData serverData) {
+ serverData.getServiceWorker().getInputSplitsHandler().receivedInputSplit(
+ splitType, serializedInputSplit);
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.REPLY_WITH_INPUT_SPLIT_REQUEST;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index 343a2de..bebac28 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -60,7 +60,11 @@ else[HADOOP_NON_SECURE]*/
/** Send aggregators from worker owner to other workers */
SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class),
/** Send message from worker to worker */
- SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class);
+ SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class),
+ /** Send request for input split from worker to master */
+ ASK_FOR_INPUT_SPLIT_REQUEST(AskForInputSplitRequest.class),
+ /** Send request with granted input split from master to workers */
+ REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class);
/** Class of request which this type corresponds to */
private final Class<? extends WritableRequest> requestClass;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
index 7171f04..3a1bd64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
@@ -20,7 +20,7 @@ package org.apache.giraph.comm.requests;
import java.io.IOException;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
/**
* Request to send final aggregated values from worker which owns
@@ -45,9 +45,9 @@ public class SendReducedToMasterRequest extends ByteArrayRequest
}
@Override
- public void doRequest(MasterAggregatorHandler aggregatorHandler) {
+ public void doRequest(MasterGlobalCommHandler commHandler) {
try {
- aggregatorHandler.acceptReducedValues(getDataInput());
+ commHandler.getAggregatorHandler().acceptReducedValues(getDataInput());
} catch (IOException e) {
throw new IllegalStateException("doRequest: " +
"IOException occurred while processing request", e);
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
index cfb9799..c53b34f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
@@ -51,7 +51,7 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
long numEdges,
boolean mustLoadCheckpoint,
CheckpointStatus checkpointStatus) {
- super(numVertices, numEdges);
+ super(numVertices, numEdges, 0);
this.localVertexCount = numLocalVertices;
this.allVerticesHalted = allVerticesHalted;
this.mustLoadCheckpoint = mustLoadCheckpoint;
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
deleted file mode 100644
index 23be1c4..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * Simple container of input split events.
- */
-public class InputSplitEvents {
- /** Input splits are ready for consumption by workers */
- private final BspEvent allReadyChanged;
- /** Input split reservation or finished notification and synchronization */
- private final BspEvent stateChanged;
- /** Input splits are done being processed by workers */
- private final BspEvent allDoneChanged;
- /** Input split done by a worker finished notification and synchronization */
- private final BspEvent doneStateChanged;
-
- /**
- * Constructor.
- *
- * @param progressable {@link Progressable} to report progress
- */
- public InputSplitEvents(Progressable progressable) {
- allReadyChanged = new PredicateLock(progressable);
- stateChanged = new PredicateLock(progressable);
- allDoneChanged = new PredicateLock(progressable);
- doneStateChanged = new PredicateLock(progressable);
- }
-
- /**
- * Get event for input splits all ready
- *
- * @return {@link BspEvent} for input splits all ready
- */
- public BspEvent getAllReadyChanged() {
- return allReadyChanged;
- }
-
- /**
- * Get event for input splits state
- *
- * @return {@link BspEvent} for input splits state
- */
- public BspEvent getStateChanged() {
- return stateChanged;
- }
-
- /**
- * Get event for input splits all done
- *
- * @return {@link BspEvent} for input splits all done
- */
- public BspEvent getAllDoneChanged() {
- return allDoneChanged;
- }
-
- /**
- * Get event for input split done
- *
- * @return {@link BspEvent} for input split done
- */
- public BspEvent getDoneStateChanged() {
- return doneStateChanged;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
deleted file mode 100644
index 4cf005e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * Simple container of input split paths for coordination via ZooKeeper.
- */
-public class InputSplitPaths {
- /** Path to the input splits written by the master */
- private final String path;
- /** Path to the input splits all ready to be processed by workers */
- private final String allReadyPath;
- /** Path to the input splits done */
- private final String donePath;
- /** Path to the input splits all done to notify the workers to proceed */
- private final String allDonePath;
-
- /**
- * Constructor.
- *
- * @param basePath Base path
- * @param dir Input splits path
- * @param doneDir Input split done path
- * @param allReadyNode Input splits all ready path
- * @param allDoneNode Input splits all done path
- */
- public InputSplitPaths(String basePath,
- String dir,
- String doneDir,
- String allReadyNode,
- String allDoneNode) {
- path = basePath + dir;
- allReadyPath = basePath + allReadyNode;
- donePath = basePath + doneDir;
- allDonePath = basePath + allDoneNode;
- }
-
- /**
- * Get path to the input splits.
- *
- * @return Path to input splits
- */
- public String getPath() {
- return path;
- }
-
- /**
- * Get path to the input splits all ready.
- *
- * @return Path to input splits all ready
- */
- public String getAllReadyPath() {
- return allReadyPath;
- }
-
- /** Get path to the input splits done.
- *
- * @return Path to input splits done
- */
- public String getDonePath() {
- return donePath;
- }
-
- /**
- * Get path to the input splits all done.
- *
- * @return Path to input splits all done
- */
- public String getAllDonePath() {
- return allDonePath;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
index c2d13cc..1c871f0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
@@ -26,6 +26,8 @@ public class VertexEdgeCount {
private final long vertexCount;
/** Immutable edges */
private final long edgeCount;
+ /** Immutable mappings */
+ private final long mappingCount;
/**
* Default constructor.
@@ -33,6 +35,7 @@ public class VertexEdgeCount {
public VertexEdgeCount() {
vertexCount = 0;
edgeCount = 0;
+ mappingCount = 0;
}
/**
@@ -40,10 +43,12 @@ public class VertexEdgeCount {
*
* @param vertexCount Final number of vertices.
* @param edgeCount Final number of edges.
+ * @param mappingCount Final number of mappings.
*/
- public VertexEdgeCount(long vertexCount, long edgeCount) {
+ public VertexEdgeCount(long vertexCount, long edgeCount, long mappingCount) {
this.vertexCount = vertexCount;
this.edgeCount = edgeCount;
+ this.mappingCount = mappingCount;
}
public long getVertexCount() {
@@ -54,6 +59,10 @@ public class VertexEdgeCount {
return edgeCount;
}
+ public long getMappingCount() {
+ return mappingCount;
+ }
+
/**
* Increment the both the vertex edge count with a {@link VertexEdgeCount}.
*
@@ -64,7 +73,8 @@ public class VertexEdgeCount {
VertexEdgeCount vertexEdgeCount) {
return new VertexEdgeCount(
vertexCount + vertexEdgeCount.getVertexCount(),
- edgeCount + vertexEdgeCount.getEdgeCount());
+ edgeCount + vertexEdgeCount.getEdgeCount(),
+ mappingCount + vertexEdgeCount.getMappingCount());
}
/**
@@ -78,11 +88,13 @@ public class VertexEdgeCount {
long vertexCount, long edgeCount) {
return new VertexEdgeCount(
this.vertexCount + vertexCount,
- this.edgeCount + edgeCount);
+ this.edgeCount + edgeCount,
+ this.mappingCount + mappingCount);
}
@Override
public String toString() {
- return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")";
+ return "(v=" + getVertexCount() + ", e=" + getEdgeCount() +
+ (mappingCount > 0 ? ", m=" + mappingCount : "") + ")";
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/InputType.java b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
new file mode 100644
index 0000000..26ee966
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+/**
+ * Type of input
+ */
+public enum InputType {
+ /** Vertex input */
+ VERTEX,
+ /** Edge input */
+ EDGE,
+ /** Mapping input */
+ MAPPING
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 0b56a4f..0e7bb9d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -21,12 +21,8 @@ package org.apache.giraph.master;
import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
-import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.Charset;
@@ -38,9 +34,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.iharder.Base64;
@@ -66,12 +59,12 @@ import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphFunctions;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.MappingInputFormat;
import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.InputType;
+import org.apache.giraph.master.input.MasterInputSplitsHandler;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphTimer;
@@ -88,8 +81,6 @@ import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReactiveJMapHistoDumper;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.WritableUtils;
@@ -99,7 +90,6 @@ import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobID;
@@ -170,7 +160,7 @@ public class BspServiceMaster<I extends WritableComparable,
private final List<PartitionStats> allPartitionStatsList =
new ArrayList<PartitionStats>();
/** Handler for global communication */
- private MasterAggregatorHandler globalCommHandler;
+ private MasterGlobalCommHandler globalCommHandler;
/** Handler for aggregators to reduce/broadcast translation */
private AggregatorToGlobalCommTranslation aggregatorTranslation;
/** Master class */
@@ -331,7 +321,7 @@ public class BspServiceMaster<I extends WritableComparable,
*/
private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
int minSplitCountHint,
- String inputSplitType) {
+ InputType inputSplitType) {
String logPrefix = "generate" + inputSplitType + "InputSplits";
List<InputSplit> splits;
try {
@@ -604,46 +594,25 @@ public class BspServiceMaster<I extends WritableComparable,
* Common method for creating vertex/edge input splits.
*
* @param inputFormat The vertex/edge input format
- * @param inputSplitPaths ZooKeeper input split paths
* @param inputSplitType Type of input split (for logging purposes)
* @return Number of splits. Returns -1 on failure to create
* valid input splits.
*/
private int createInputSplits(GiraphInputFormat inputFormat,
- InputSplitPaths inputSplitPaths,
- String inputSplitType) {
+ InputType inputSplitType) {
ImmutableClassesGiraphConfiguration conf = getConfiguration();
String logPrefix = "create" + inputSplitType + "InputSplits";
// Only the 'master' should be doing this. Wait until the number of
// processes that have reported health exceeds the minimum percentage.
// If the minimum percentage is not met, fail the job. Otherwise
// generate the input splits
- String inputSplitsPath = inputSplitPaths.getPath();
- try {
- if (getZkExt().exists(inputSplitsPath, false) != null) {
- LOG.info(inputSplitsPath + " already exists, no need to create");
- return Integer.parseInt(
- new String(getZkExt().getData(inputSplitsPath, false, null),
- Charset.defaultCharset()));
- }
- } catch (KeeperException.NoNodeException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Need to create the input splits at " +
- inputSplitsPath);
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(logPrefix + ": KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ": InterruptedException", e);
- }
-
- // When creating znodes, in case the master has already run, resume
- // where it left off.
List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
if (healthyWorkerInfoList == null) {
setJobStateFailed("Not enough healthy workers to create input splits");
return -1;
}
+ globalCommHandler.getInputSplitsHandler().initialize(masterClient,
+ healthyWorkerInfoList);
// Create at least as many splits as the total number of input threads.
int minSplitCountHint = healthyWorkerInfoList.size() *
@@ -671,54 +640,8 @@ public class BspServiceMaster<I extends WritableComparable,
"some threads will be not used");
}
- // Write input splits to zookeeper in parallel
- int inputSplitThreadCount = conf.getInt(NUM_MASTER_ZK_INPUT_SPLIT_THREADS,
- DEFAULT_INPUT_SPLIT_THREAD_COUNT);
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Starting to write input split data " +
- "to zookeeper with " + inputSplitThreadCount + " threads");
- }
- try {
- getZkExt().createExt(inputSplitsPath, null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- false);
- } catch (KeeperException e) {
- LOG.info(logPrefix + ": Node " +
- inputSplitsPath + " keeper exception " + e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ' ' + e.getMessage(), e);
- }
- ExecutorService taskExecutor =
- Executors.newFixedThreadPool(inputSplitThreadCount);
- boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf);
- for (int i = 0; i < splitList.size(); ++i) {
- InputSplit inputSplit = splitList.get(i);
- taskExecutor.submit(new LogStacktraceCallable<Void>(
- new WriteInputSplit(inputFormat, inputSplit, inputSplitsPath, i,
- writeLocations)));
- }
- taskExecutor.shutdown();
- ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
- if (LOG.isInfoEnabled()) {
- LOG.info(logPrefix + ": Done writing input split data to zookeeper");
- }
-
- // Let workers know they can start trying to load the input splits
- try {
- getZkExt().createExt(inputSplitPaths.getAllReadyPath(),
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- false);
- } catch (KeeperException.NodeExistsException e) {
- LOG.info(logPrefix + ": Node " +
- inputSplitPaths.getAllReadyPath() + " already exists.");
- } catch (KeeperException e) {
- throw new IllegalStateException(logPrefix + ": KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
- }
+ globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
+ splitList, inputFormat);
return splitList.size();
}
@@ -730,8 +653,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
getConfiguration().createWrappedMappingInputFormat();
- return createInputSplits(mappingInputFormat, mappingInputSplitsPaths,
- "Mapping");
+ return createInputSplits(mappingInputFormat, InputType.MAPPING);
}
@Override
@@ -742,8 +664,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
VertexInputFormat<I, V, E> vertexInputFormat =
getConfiguration().createWrappedVertexInputFormat();
- return createInputSplits(vertexInputFormat, vertexInputSplitsPaths,
- "Vertex");
+ return createInputSplits(vertexInputFormat, InputType.VERTEX);
}
@Override
@@ -754,8 +675,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
EdgeInputFormat<I, E> edgeInputFormat =
getConfiguration().createWrappedEdgeInputFormat();
- return createInputSplits(edgeInputFormat, edgeInputSplitsPaths,
- "Edge");
+ return createInputSplits(edgeInputFormat, InputType.EDGE);
}
@Override
@@ -764,7 +684,7 @@ public class BspServiceMaster<I extends WritableComparable,
}
@Override
- public MasterAggregatorHandler getGlobalCommHandler() {
+ public MasterGlobalCommHandler getGlobalCommHandler() {
return globalCommHandler;
}
@@ -838,7 +758,7 @@ public class BspServiceMaster<I extends WritableComparable,
});
- globalCommHandler.readFields(finalizedStream);
+ globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
aggregatorTranslation.readFields(finalizedStream);
masterCompute.readFields(finalizedStream);
finalizedStream.close();
@@ -911,12 +831,15 @@ public class BspServiceMaster<I extends WritableComparable,
if (masterChildArr.get(0).equals(myBid)) {
GiraphStats.getInstance().getCurrentMasterTaskPartition().
setValue(getTaskPartition());
- globalCommHandler = new MasterAggregatorHandler(
- getConfiguration(), getContext());
+
+ globalCommHandler = new MasterGlobalCommHandler(
+ new MasterAggregatorHandler(getConfiguration(), getContext()),
+ new MasterInputSplitsHandler(
+ getConfiguration().useInputSplitLocality()));
aggregatorTranslation = new AggregatorToGlobalCommTranslation(
getConfiguration(), globalCommHandler);
- globalCommHandler.initialize(this);
+ globalCommHandler.getAggregatorHandler().initialize(this);
masterCompute = getConfiguration().createMasterCompute();
masterCompute.setMasterService(this);
@@ -1128,7 +1051,7 @@ public class BspServiceMaster<I extends WritableComparable,
for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
}
- globalCommHandler.write(finalizedOutputStream);
+ globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
aggregatorTranslation.write(finalizedOutputStream);
masterCompute.write(finalizedOutputStream);
finalizedOutputStream.close();
@@ -1265,12 +1188,8 @@ public class BspServiceMaster<I extends WritableComparable,
@Override
public void restartFromCheckpoint(long checkpoint) {
// Process:
- // 1. Remove all old input split data
- // 2. Increase the application attempt and set to the correct checkpoint
- // 3. Send command to all workers to restart their tasks
- zkDeleteNode(vertexInputSplitsPaths.getPath());
- zkDeleteNode(edgeInputSplitsPaths.getPath());
-
+ // 1. Increase the application attempt and set to the correct checkpoint
+ // 2. Send command to all workers to restart their tasks
setApplicationAttempt(getApplicationAttempt() + 1);
setCachedSuperstep(checkpoint);
setRestartedSuperstep(checkpoint);
@@ -1493,37 +1412,32 @@ public class BspServiceMaster<I extends WritableComparable,
/**
* Coordinate the exchange of vertex/edge input splits among workers.
- *
- * @param inputSplitPaths Input split paths
- * @param inputSplitEvents Input split events
- * @param inputSplitsType Type of input splits (for logging purposes)
*/
- private void coordinateInputSplits(InputSplitPaths inputSplitPaths,
- InputSplitEvents inputSplitEvents,
- String inputSplitsType) {
+ private void coordinateInputSplits() {
// Coordinate the workers finishing sending their vertices/edges to the
// correct workers and signal when everything is done.
- String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
- if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
+ if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
chosenWorkerInfoList,
- inputSplitEvents.getDoneStateChanged(),
+ getInputSplitsWorkerDoneEvent(),
false)) {
- throw new IllegalStateException(logPrefix + ": Worker failed during " +
- "input split (currently not supported)");
+ throw new IllegalStateException("coordinateInputSplits: Worker failed " +
+ "during input split (currently not supported)");
}
try {
- getZkExt().createExt(inputSplitPaths.getAllDonePath(),
+ getZkExt().createExt(inputSplitsAllDonePath,
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
false);
} catch (KeeperException.NodeExistsException e) {
LOG.info("coordinateInputSplits: Node " +
- inputSplitPaths.getAllDonePath() + " already exists.");
+ inputSplitsAllDonePath + " already exists.");
} catch (KeeperException e) {
- throw new IllegalStateException(logPrefix + ": KeeperException", e);
+ throw new IllegalStateException(
+ "coordinateInputSplits: KeeperException", e);
} catch (InterruptedException e) {
- throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
+ throw new IllegalStateException(
+ "coordinateInputSplits: IllegalStateException", e);
}
}
@@ -1543,7 +1457,7 @@ public class BspServiceMaster<I extends WritableComparable,
*/
private void initializeAggregatorInputSuperstep()
throws InterruptedException {
- globalCommHandler.prepareSuperstep();
+ globalCommHandler.getAggregatorHandler().prepareSuperstep();
prepareMasterCompute(getSuperstep());
try {
@@ -1559,9 +1473,9 @@ public class BspServiceMaster<I extends WritableComparable,
"initializeAggregatorInputSuperstep: Failed in access", e);
}
aggregatorTranslation.postMasterCompute();
- globalCommHandler.finishSuperstep();
+ globalCommHandler.getAggregatorHandler().finishSuperstep();
- globalCommHandler.sendDataToOwners(masterClient);
+ globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
}
/**
@@ -1627,7 +1541,7 @@ public class BspServiceMaster<I extends WritableComparable,
// We need to finalize aggregators from previous superstep
if (getSuperstep() >= 0) {
aggregatorTranslation.postMasterCompute();
- globalCommHandler.finishSuperstep();
+ globalCommHandler.getAggregatorHandler().finishSuperstep();
}
masterClient.openConnections();
@@ -1663,25 +1577,13 @@ public class BspServiceMaster<I extends WritableComparable,
// We need to send aggregators to worker owners after new worker assignments
if (getSuperstep() >= 0) {
- globalCommHandler.sendDataToOwners(masterClient);
+ globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
}
if (getSuperstep() == INPUT_SUPERSTEP) {
// Initialize aggregators before coordinating
initializeAggregatorInputSuperstep();
- if (getConfiguration().hasMappingInputFormat()) {
- coordinateInputSplits(mappingInputSplitsPaths, mappingInputSplitsEvents,
- "Mapping");
- }
- // vertex loading and edge loading
- if (getConfiguration().hasVertexInputFormat()) {
- coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
- "Vertex");
- }
- if (getConfiguration().hasEdgeInputFormat()) {
- coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents,
- "Edge");
- }
+ coordinateInputSplits();
}
String finishedWorkerPath =
@@ -1695,7 +1597,7 @@ public class BspServiceMaster<I extends WritableComparable,
// Collect aggregator values, then run the master.compute() and
// finally save the aggregator values
- globalCommHandler.prepareSuperstep();
+ globalCommHandler.getAggregatorHandler().prepareSuperstep();
aggregatorTranslation.prepareSuperstep();
SuperstepClasses superstepClasses =
@@ -1761,7 +1663,8 @@ public class BspServiceMaster<I extends WritableComparable,
} else {
superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
}
- globalCommHandler.writeAggregators(getSuperstep(), superstepState);
+ globalCommHandler.getAggregatorHandler().writeAggregators(
+ getSuperstep(), superstepState);
return superstepState;
}
@@ -2009,7 +1912,7 @@ public class BspServiceMaster<I extends WritableComparable,
failJob(new Exception("Checkpoint and halt requested. " +
"Killing this job."));
}
- globalCommHandler.close();
+ globalCommHandler.getAggregatorHandler().close();
masterClient.closeConnections();
masterServer.close();
}
@@ -2122,100 +2025,4 @@ public class BspServiceMaster<I extends WritableComparable,
gs.getAggregateSentMessageBytes()
.increment(globalStats.getMessageBytesCount());
}
-
- /**
- * Task that writes a given input split to zookeeper.
- * Upon failure call() throws an exception.
- */
- private class WriteInputSplit implements Callable<Void> {
- /** Input format */
- private final GiraphInputFormat inputFormat;
- /** Input split which we are going to write */
- private final InputSplit inputSplit;
- /** Input splits path */
- private final String inputSplitsPath;
- /** Index of the input split */
- private final int index;
- /** Whether to write locality information */
- private final boolean writeLocations;
-
- /**
- * Constructor
- *
- * @param inputFormat Input format
- * @param inputSplit Input split which we are going to write
- * @param inputSplitsPath Input splits path
- * @param index Index of the input split
- * @param writeLocations whether to write the input split's locations (to
- * be used by workers for prioritizing local splits
- * when reading)
- */
- public WriteInputSplit(GiraphInputFormat inputFormat,
- InputSplit inputSplit,
- String inputSplitsPath,
- int index,
- boolean writeLocations) {
- this.inputFormat = inputFormat;
- this.inputSplit = inputSplit;
- this.inputSplitsPath = inputSplitsPath;
- this.index = index;
- this.writeLocations = writeLocations;
- }
-
- @Override
- public Void call() {
- String inputSplitPath = null;
- try {
- ByteArrayOutputStream byteArrayOutputStream =
- new ByteArrayOutputStream();
- DataOutput outputStream =
- new DataOutputStream(byteArrayOutputStream);
-
- if (writeLocations) {
- String[] splitLocations = inputSplit.getLocations();
- StringBuilder locations = null;
- if (splitLocations != null) {
- int splitListLength =
- Math.min(splitLocations.length, localityLimit);
- locations = new StringBuilder();
- for (String location : splitLocations) {
- locations.append(location)
- .append(--splitListLength > 0 ? "\t" : "");
- }
- }
- Text.writeString(outputStream,
- locations == null ? "" : locations.toString());
- }
-
- inputFormat.writeInputSplit(inputSplit, outputStream);
- inputSplitPath = inputSplitsPath + "/" + index;
- getZkExt().createExt(inputSplitPath,
- byteArrayOutputStream.toByteArray(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("call: Created input split " +
- "with index " + index + " serialized as " +
- byteArrayOutputStream.toString(Charset.defaultCharset().name()));
- }
- } catch (KeeperException.NodeExistsException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("call: Node " +
- inputSplitPath + " already exists.");
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "call: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "call: IllegalStateException", e);
- } catch (IOException e) {
- throw new IllegalStateException(
- "call: IOException", e);
- }
- return null;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 5558cee..8ca3d3a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -42,7 +42,7 @@ import com.google.common.collect.Maps;
/** Handler for reduce/broadcast on the master */
public class MasterAggregatorHandler
- implements MasterGlobalCommUsage, Writable {
+ implements MasterGlobalCommUsageAggregators, Writable {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(MasterAggregatorHandler.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
new file mode 100644
index 0000000..717a24d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.master.input.MasterInputSplitsHandler;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Handler for all master communications
+ */
+public class MasterGlobalCommHandler implements MasterGlobalCommUsage {
+ /** Aggregator handler */
+ private final MasterAggregatorHandler aggregatorHandler;
+ /** Input splits handler*/
+ private final MasterInputSplitsHandler inputSplitsHandler;
+
+ /**
+ * Constructor
+ *
+ * @param aggregatorHandler Aggregator handler
+ * @param inputSplitsHandler Input splits handler
+ */
+ public MasterGlobalCommHandler(
+ MasterAggregatorHandler aggregatorHandler,
+ MasterInputSplitsHandler inputSplitsHandler) {
+ this.aggregatorHandler = aggregatorHandler;
+ this.inputSplitsHandler = inputSplitsHandler;
+ }
+
+ public MasterAggregatorHandler getAggregatorHandler() {
+ return aggregatorHandler;
+ }
+
+ public MasterInputSplitsHandler getInputSplitsHandler() {
+ return inputSplitsHandler;
+ }
+
+ @Override
+ public <S, R extends Writable> void registerReducer(String name,
+ ReduceOperation<S, R> reduceOp) {
+ aggregatorHandler.registerReducer(name, reduceOp);
+ }
+
+ @Override
+ public <S, R extends Writable> void registerReducer(String name,
+ ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+ aggregatorHandler.registerReducer(name, reduceOp, globalInitialValue);
+ }
+
+ @Override
+ public <R extends Writable> R getReduced(String name) {
+ return aggregatorHandler.getReduced(name);
+ }
+
+ @Override
+ public void broadcast(String name, Writable value) {
+ aggregatorHandler.broadcast(name, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
index 7ee9048..60b1809 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
@@ -17,52 +17,9 @@
*/
package org.apache.giraph.master;
-import org.apache.giraph.reducers.ReduceOperation;
-import org.apache.hadoop.io.Writable;
-
/**
- * Master compute can access reduce and broadcast methods
- * through this interface, from masterCompute method.
+ * All global master communication
*/
-public interface MasterGlobalCommUsage {
- /**
- * Register reducer to be reduced in the next worker computation,
- * using given name and operations.
- * @param name Name of the reducer
- * @param reduceOp Reduce operations
- * @param <S> Single value type
- * @param <R> Reduced value type
- */
- <S, R extends Writable> void registerReducer(
- String name, ReduceOperation<S, R> reduceOp);
-
- /**
- * Register reducer to be reduced in the next worker computation, using
- * given name and operations, starting globally from globalInitialValue.
- * (globalInitialValue is reduced only once, each worker will still start
- * from neutral initial value)
- *
- * @param name Name of the reducer
- * @param reduceOp Reduce operations
- * @param globalInitialValue Global initial value
- * @param <S> Single value type
- * @param <R> Reduced value type
- */
- <S, R extends Writable> void registerReducer(
- String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
-
- /**
- * Get reduced value from previous worker computation.
- * @param name Name of the reducer
- * @return Reduced value
- * @param <R> Reduced value type
- */
- <R extends Writable> R getReduced(String name);
-
- /**
- * Broadcast given value to all workers for next computation.
- * @param name Name of the broadcast object
- * @param value Value
- */
- void broadcast(String name, Writable value);
+public interface MasterGlobalCommUsage
+ extends MasterGlobalCommUsageAggregators {
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
new file mode 100644
index 0000000..62c1f3f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access reduce and broadcast methods
+ * through this interface, from masterCompute method.
+ */
+public interface MasterGlobalCommUsageAggregators {
+ /**
+ * Register reducer to be reduced in the next worker computation,
+ * using given name and operations.
+ * @param name Name of the reducer
+ * @param reduceOp Reduce operations
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+ <S, R extends Writable> void registerReducer(
+ String name, ReduceOperation<S, R> reduceOp);
+
+ /**
+ * Register reducer to be reduced in the next worker computation, using
+ * given name and operations, starting globally from globalInitialValue.
+ * (globalInitialValue is reduced only once, each worker will still start
+ * from neutral initial value)
+ *
+ * @param name Name of the reducer
+ * @param reduceOp Reduce operations
+ * @param globalInitialValue Global initial value
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+ <S, R extends Writable> void registerReducer(
+ String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
+
+ /**
+ * Get reduced value from previous worker computation.
+ * @param name Name of the reducer
+ * @return Reduced value
+ * @param <R> Reduced value type
+ */
+ <R extends Writable> R getReduced(String name);
+
+ /**
+ * Broadcast given value to all workers for next computation.
+ * @param name Name of the broadcast object
+ * @param value Value
+ */
+ void broadcast(String name, Writable value);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..5168e32
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Input splits organizer for vertex and edge input splits on master, which
+ * doesn't use locality information
+ */
+public class BasicInputSplitsMasterOrganizer
+ implements InputSplitsMasterOrganizer {
+ /** Available splits queue */
+ private final ConcurrentLinkedQueue<byte[]> splits;
+
+ /**
+ * Constructor
+ *
+ * @param serializedSplits Splits
+ */
+ public BasicInputSplitsMasterOrganizer(List<byte[]> serializedSplits) {
+ splits = new ConcurrentLinkedQueue<>(serializedSplits);
+ }
+
+ @Override
+ public byte[] getSerializedSplitFor(int workerTaskId) {
+ return splits.poll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..d5a0131
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+/**
+ * Interface for different input split organizers on master
+ */
+public interface InputSplitsMasterOrganizer {
+ /**
+ * @param workerTaskId Id of worker requesting split
+ *
+ * @return Get next split for the worker, or null if all splits were taken
+ * already
+ */
+ byte[] getSerializedSplitFor(int workerTaskId);
+}