You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by jg...@apache.org on 2012/08/13 22:07:09 UTC
svn commit: r1372575 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
Author: jghoman
Date: Mon Aug 13 20:07:09 2012
New Revision: 1372575
URL: http://svn.apache.org/viewvc?rev=1372575&view=rev
Log:
GIRAPH-275: Restore data locality to workers reading InputSplits where possible without querying NameNode, ZooKeeper. Contributed by Eli Reisman.
Added:
giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1372575&r1=1372574&r2=1372575&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Aug 13 20:07:09 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-275: Restore data locality to workers reading InputSplits where possible
+ without querying NameNode, ZooKeeper. (Eli Reisman via jghoman)
+
GIRAPH-258: Check type compatibility before submitting job.
(Eli Reisman via jghoman)
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1372575&r1=1372574&r2=1372575&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Mon Aug 13 20:07:09 2012
@@ -134,6 +134,8 @@ public class BspServiceMaster<I extends
private AggregatorWriter aggregatorWriter;
/** Master class */
private MasterCompute masterCompute;
+ /** Limit locality information added to each InputSplit znode */
+ private final int localityLimit = 5;
/**
* Constructor for setting up the master.
@@ -517,13 +519,29 @@ public class BspServiceMaster<I extends
"some workers will be not used");
}
String inputSplitPath = null;
+ String[] splitLocations;
+ InputSplit inputSplit;
+ StringBuilder locations;
for (int i = 0; i < splitList.size(); ++i) {
try {
ByteArrayOutputStream byteArrayOutputStream =
new ByteArrayOutputStream();
DataOutput outputStream =
new DataOutputStream(byteArrayOutputStream);
- InputSplit inputSplit = splitList.get(i);
+ inputSplit = splitList.get(i);
+ splitLocations = inputSplit.getLocations();
+ locations = null;
+ if (splitLocations != null) {
+ int splitListLength =
+ Math.min(splitLocations.length, localityLimit);
+ locations = new StringBuilder();
+ for (String location : splitLocations) {
+ locations.append(location)
+ .append(--splitListLength > 0 ? "\t" : "");
+ }
+ }
+ Text.writeString(outputStream,
+ locations == null ? "" : locations.toString());
Text.writeString(outputStream,
inputSplit.getClass().getName());
((Writable) inputSplit).write(outputStream);
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1372575&r1=1372574&r2=1372575&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Mon Aug 13 20:07:09 2012
@@ -205,7 +205,10 @@ public class BspServiceWorker<I extends
if (inputSplitCount == -1) {
inputSplitCount = inputSplitPathList.size();
}
-
+ LocalityInfoSorter localitySorter = new LocalityInfoSorter(
+ getZkExt(), inputSplitPathList, getHostname());
+ inputSplitPathList =
+ localitySorter.getPrioritizedLocalInputSplits();
String reservedInputSplitPath = null;
Stat reservedStat = null;
while (true) {
@@ -400,6 +403,7 @@ public class BspServiceWorker<I extends
DataInputStream inputStream =
new DataInputStream(new ByteArrayInputStream(splitList));
+ Text.readString(inputStream); // location data unused here, skip
String inputSplitClass = Text.readString(inputStream);
InputSplit inputSplit = (InputSplit)
ReflectionUtils.newInstance(
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java?rev=1372575&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java Mon Aug 13 20:07:09 2012
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility class to extract InputSplit locality information
+ * from znodes and to sort the InputSplit list for the worker
+ * owning this object to select splits from.
+ */
+public class LocalityInfoSorter {
+ /** The worker's local ZooKeeperExt ref */
+ private final ZooKeeperExt zooKeeper;
+ /** The List of InputSplit znode paths */
+ private final List<String> pathList;
+ /** The worker's hostname */
+ private final String hostName;
+
+ /**
+ * Constructor
+ * @param zooKeeper the worker's ZkExt
+ * @param pathList the path to read from
+ * @param hostName the worker's host name (for matching)
+ */
+ public LocalityInfoSorter(ZooKeeperExt zooKeeper, List<String> pathList,
+ String hostName) {
+ this.zooKeeper = zooKeeper;
+ this.pathList = pathList;
+ this.hostName = hostName;
+ }
+
+ /**
+ * Re-order list of InputSplits so files local to this worker node's
+ * disk are the first it will iterate over when attempting to claim
+ * a split to read. This will increase locality of data reads with greater
+ * probability as the % of total nodes in the cluster hosting data and workers
+ * BOTH increase towards 100%. Replication increases our chances of a "hit."
+ *
+ * @return the pathList, with host-local splits sorted to the front.
+ */
+ public List<String> getPrioritizedLocalInputSplits() {
+ List<String> sortedList = new ArrayList<String>();
+ boolean prioritize;
+ String hosts = null;
+ for (int index = 0; index < pathList.size(); ++index) {
+ final String path = pathList.get(index);
+ prioritize = false;
+ try {
+ hosts = getLocationsFromZkInputSplitData(path);
+ } catch (IOException ioe) {
+ hosts = null; // no problem, just don't sort this entry
+ } catch (KeeperException ke) {
+ hosts = null;
+ } catch (InterruptedException ie) {
+ hosts = null;
+ }
+ prioritize = hosts == null ? false : hosts.contains(hostName);
+ sortedList.add(prioritize ? 0 : index, path);
+ }
+ return sortedList;
+ }
+
+ /**
+ * Utility for extracting locality data from an InputSplit ZNode.
+ *
+ * @param zkSplitPath the input split path to attempt to read
+ * ZNode locality data from for this InputSplit.
+ * @return an array of String hostnames from ZNode data, or throws
+ */
+ private String getLocationsFromZkInputSplitData(String zkSplitPath)
+ throws IOException, KeeperException, InterruptedException {
+ byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
+ DataInputStream inputStream =
+ new DataInputStream(new ByteArrayInputStream(locationData));
+ // only read the "first" entry in the znode data, the locations
+ return Text.readString(inputStream);
+ }
+}
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1372575&r1=1372574&r2=1372575&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Mon Aug 13 20:07:09 2012
@@ -32,16 +32,19 @@ import org.apache.giraph.examples.Simple
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.LocalityInfoSorter;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.TextAggregatorWriter;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.giraph.io.JsonLongDoubleFloatDoubleVertexOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -54,6 +57,7 @@ import org.apache.hadoop.mapreduce.JobCo
else[HADOOP_NON_SASL_RPC]*/
import org.apache.hadoop.mapreduce.task.JobContextImpl;
/*end[HADOOP_NON_SASL_RPC]*/
+import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -61,6 +65,9 @@ import static org.junit.Assert.assertSam
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
@@ -71,6 +78,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -285,6 +294,46 @@ public class TestBspBasic extends BspCas
}
/**
+ * Run a test to see if the LocalityInfoSorter can correctly sort
+ * locality information from a mocked znode of data.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testLocalityInfoSorter()
+ throws IOException, KeeperException, InterruptedException {
+ final List<String> goodList = new ArrayList<String>();
+ Collections.addAll(goodList, "good", "bad", "ugly");
+ final List<String> testList = new ArrayList<String>();
+ Collections.addAll(testList, "bad", "good", "ugly");
+ final String localHost = "node.LOCAL.com";
+ // build output just as we do to store hostlists in ZNODES
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ String last = "node.test4.com\tnode.test5.com\tnode.test6.com";
+ Text.writeString(dos, last);
+ byte[] LOCALITY_LAST = baos.toByteArray();
+ baos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(baos);
+ String middle = "node.test1.com\tnode.test2.com\tnode.test3.com";
+ Text.writeString(dos, middle);
+ byte[] LOCALITY_MIDDLE = baos.toByteArray();
+ baos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(baos);
+ String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com";
+ Text.writeString(dos, first);
+ byte[] LOCALITY_FIRST = baos.toByteArray();
+ ZooKeeperExt zk = mock(ZooKeeperExt.class);
+ when(zk.getData("ugly", false, null)).thenReturn(LOCALITY_LAST);
+ when(zk.getData("bad", false, null)).thenReturn(LOCALITY_MIDDLE);
+ when(zk.getData("good", false, null)).thenReturn(LOCALITY_FIRST);
+ LocalityInfoSorter lis = new LocalityInfoSorter(zk, testList, localHost);
+ final List<String> resultList = lis.getPrioritizedLocalInputSplits();
+ assertEquals(goodList, resultList);
+ }
+
+ /**
* Run a sample BSP job locally and test PageRank.
*
* @throws IOException