You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/10/06 07:08:50 UTC
svn commit: r1394926 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
Author: maja
Date: Sat Oct 6 05:08:49 2012
New Revision: 1394926
URL: http://svn.apache.org/viewvc?rev=1394926&view=rev
Log:
GIRAPH-307: Don't recreate InputSplit list
Added:
giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
Removed:
giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1394926&r1=1394925&r2=1394926&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Sat Oct 6 05:08:49 2012
@@ -2,6 +2,10 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-307: InputSplit list can be long with many workers
+ (and locality info) and should not be re-created every time a
+ worker calls reserveInputSplit() (ereisman via majakabiljo)
+
GIRAPH-358: Rename package format->io in giraph-formats-contrib for
consistency with main package. (apresta via aching)
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1394926&r1=1394925&r2=1394926&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Oct 6 05:08:49 2012
@@ -92,8 +92,6 @@ public class BspServiceWorker<I extends
implements CentralizedServiceWorker<I, V, E, M> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
- /** Number of input splits */
- private int inputSplitCount = -1;
/** My process health znode */
private String myHealthZnode;
/** Worker info */
@@ -126,6 +124,12 @@ public class BspServiceWorker<I extends
* Partition store for worker (only used by the Hadoop RPC implementation).
*/
private final PartitionStore<I, V, E, M> workerPartitionStore;
+ /**
+ * Stores and processes the list of InputSplits advertised
+ * in a tree of child znodes by the master.
+ */
+ private InputSplitPathOrganizer splitOrganizer = null;
+
/** Handler for aggregators */
private final WorkerAggregatorHandler aggregatorHandler;
@@ -234,20 +238,16 @@ else[HADOOP_NON_SECURE]*/
*/
private String reserveInputSplit()
throws KeeperException, InterruptedException {
- List<String> inputSplitPathList = null;
- inputSplitPathList =
- getZkExt().getChildrenExt(inputSplitsPath, false, false, true);
- if (inputSplitCount == -1) {
- inputSplitCount = inputSplitPathList.size();
+ if (null == splitOrganizer) {
+ splitOrganizer = new InputSplitPathOrganizer(getZkExt(),
+ inputSplitsPath, getHostname(), getWorkerInfo().getPort());
}
- LocalityInfoSorter localitySorter = new LocalityInfoSorter(
- getZkExt(), inputSplitPathList, getHostname(), getWorkerInfo().getPort());
String reservedInputSplitPath = null;
Stat reservedStat = null;
final Mapper<?, ?, ?, ?>.Context context = getContext();
while (true) {
int reservedInputSplits = 0;
- for (String nextSplitToClaim : localitySorter) {
+ for (String nextSplitToClaim : splitOrganizer) {
context.progress();
String tmpInputSplitReservedPath =
nextSplitToClaim + INPUT_SPLIT_RESERVED_NODE;
@@ -265,7 +265,7 @@ else[HADOOP_NON_SECURE]*/
if (LOG.isInfoEnabled()) {
float percentFinished =
reservedInputSplits * 100.0f /
- inputSplitPathList.size();
+ splitOrganizer.getPathListSize();
LOG.info("reserveInputSplit: Reserved input " +
"split path " + reservedInputSplitPath +
", overall roughly " +
@@ -292,10 +292,10 @@ else[HADOOP_NON_SECURE]*/
if (LOG.isInfoEnabled()) {
LOG.info("reserveInputSplit: reservedPath = " +
reservedInputSplitPath + ", " + reservedInputSplits +
- " of " + inputSplitPathList.size() +
+ " of " + splitOrganizer.getPathListSize() +
" InputSplits are finished.");
}
- if (reservedInputSplits == inputSplitPathList.size()) {
+ if (reservedInputSplits == splitOrganizer.getPathListSize()) {
transferRegulator = null; // don't need this anymore
return null;
}
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java?rev=1394926&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java Sat Oct 6 05:08:49 2012
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility class to extract the list of InputSplits from the
+ * ZooKeeper tree of "claimable splits" the master created,
+ * and to sort the list to favor local data blocks.
+ *
+ * This class provides an Iterator for the list the worker will
+ * claim splits from, making all sorting and data-code locality
+ * processing done here invisible to callers. The aim is to cut
+ * down on the number of ZK reads workers perform before locating
+ * an unclaimed InputSplit.
+ */
+public class InputSplitPathOrganizer implements Iterable<String> {
+ /** The worker's local ZooKeeperExt ref */
+ private final ZooKeeperExt zooKeeper;
+ /** The List of InputSplit znode paths */
+ private final List<String> pathList;
+ /** The worker's hostname */
+ private final String hostName;
+ /** The adjusted base offset by which to iterate on the path list */
+ private int baseOffset;
+
+ /**
+ * Constructor
+ * @param zooKeeper the worker's ZkExt
+ * @param zkPathList the path to read from
+ * @param hostName the worker's host name (for matching)
+ * @param port the port number for this worker
+ */
+ public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
+ final String zkPathList, final String hostName, final int port)
+ throws KeeperException, InterruptedException {
+ this.zooKeeper = zooKeeper;
+ this.pathList = zooKeeper.getChildrenExt(zkPathList, false, false, true);
+ this.hostName = hostName;
+ this.baseOffset = 0; // set later after switching out local paths
+ prioritizeLocalInputSplits(port);
+ }
+
+ /**
+ * Re-order list of InputSplits so files local to this worker node's
+ * disk are the first it will iterate over when attempting to claim
+ * a split to read. This will increase locality of data reads with greater
+ * probability as the % of total nodes in the cluster hosting data and workers
+ * BOTH increase towards 100%. Replication increases our chances of a "hit."
+ * @param port the port number for hashing unique iteration indexes for all
+ * workers, even those sharing the same host node.
+ */
+ private void prioritizeLocalInputSplits(final int port) {
+ List<String> sortedList = new ArrayList<String>();
+ String hosts = null;
+ for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
+ final String path = iterator.next();
+ try {
+ hosts = getLocationsFromZkInputSplitData(path);
+ } catch (IOException ioe) {
+ hosts = null; // no problem, just don't sort this entry
+ } catch (KeeperException ke) {
+ hosts = null;
+ } catch (InterruptedException ie) {
+ hosts = null;
+ }
+ if (hosts != null && hosts.contains(hostName)) {
+ sortedList.add(path); // collect the local block
+ iterator.remove(); // remove local block from list
+ }
+ }
+ // shuffle the local blocks in case several workers exist on this host
+ Collections.shuffle(sortedList);
+ // determine the hash-based offset for this worker to iterate from
+ // and place the local blocks into the list at that index, if any
+ final int temp = hostName.hashCode() + (19 * port);
+ if (pathList.size() != 0) {
+ baseOffset =
+ Math.abs(temp == Integer.MIN_VALUE ? 0 : temp) % pathList.size();
+ }
+ // re-insert local paths at "adjusted index zero" for caller to iterate on
+ pathList.addAll(baseOffset, sortedList);
+ }
+
+ /**
+ * Utility for extracting locality data from an InputSplit ZNode.
+ *
+ * @param zkSplitPath the input split path to attempt to read
+ * ZNode locality data from for this InputSplit.
+ * @return a String of hostnames from ZNode data, or throws
+ */
+ private String getLocationsFromZkInputSplitData(String zkSplitPath)
+ throws IOException, KeeperException, InterruptedException {
+ byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
+ DataInputStream inputStream =
+ new DataInputStream(new ByteArrayInputStream(locationData));
+ // only read the "first" entry in the znode data, the locations
+ return Text.readString(inputStream);
+ }
+
+ /**
+ * Utility accessor for Input Split znode path list size
+ * @return the size of <code>this.pathList</code>
+ */
+ public int getPathListSize() {
+ return this.pathList.size();
+ }
+
+ /**
+ * Iterator for the pathList
+ * @return an iterator for our list of input split paths
+ */
+ public Iterator<String> iterator() {
+ return new PathListIterator();
+ }
+
+ /**
+ * Iterator for path list that handles the locality and hash offsetting.
+ */
+ public class PathListIterator implements Iterator<String> {
+ /** the current iterator index */
+ private int currentIndex = 0;
+
+ /**
+ * Do we have more list to iterate upon?
+ * @return true if more path strings are available
+ */
+ @Override
+ public boolean hasNext() {
+ return currentIndex < pathList.size();
+ }
+
+ /** return the next pathList element
+ * @return the next input split path
+ */
+ @Override
+ public String next() {
+ return pathList.get((baseOffset + currentIndex++) % pathList.size());
+ }
+
+ /** Just a placeholder; should not do anything! */
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not allowed.");
+ }
+ }
+}
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1394926&r1=1394925&r2=1394926&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Sat Oct 6 05:08:49 2012
@@ -33,7 +33,7 @@ import org.apache.giraph.examples.Simple
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.LocalityInfoSorter;
+import org.apache.giraph.graph.InputSplitPathOrganizer;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.TextAggregatorWriter;
import org.apache.giraph.graph.Vertex;
@@ -312,20 +312,21 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
}
/**
- * Run a test to see if the LocalityInfoSorter can correctly sort
+ * Run a test to see if the InputSplitPathOrganizer can correctly sort
* locality information from a mocked znode of data.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
- public void testLocalityInfoSorter()
+ public void testInputSplitPathOrganizer()
throws IOException, KeeperException, InterruptedException {
final List<String> goodList = new ArrayList<String>();
Collections.addAll(goodList, "good", "bad", "ugly");
final List<String> testList = new ArrayList<String>();
Collections.addAll(testList, "bad", "good", "ugly");
final String localHost = "node.LOCAL.com";
+ final String testListName = "test_list_parent_znode";
// build output just as we do to store hostlists in ZNODES
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
@@ -343,11 +344,12 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
Text.writeString(dos, first);
byte[] LOCALITY_FIRST = baos.toByteArray();
ZooKeeperExt zk = mock(ZooKeeperExt.class);
+ when(zk.getChildrenExt(testListName, false, false, true)).thenReturn(testList);
when(zk.getData("ugly", false, null)).thenReturn(LOCALITY_LAST);
when(zk.getData("bad", false, null)).thenReturn(LOCALITY_MIDDLE);
when(zk.getData("good", false, null)).thenReturn(LOCALITY_FIRST);
- LocalityInfoSorter lis =
- new LocalityInfoSorter(zk, testList, localHost, 0);
+ InputSplitPathOrganizer lis =
+ new InputSplitPathOrganizer(zk, testListName, localHost, 0);
final List<String> resultList = new ArrayList<String>();
for (String next : lis) {
resultList.add(next);