You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by jg...@apache.org on 2012/08/13 22:07:09 UTC

svn commit: r1372575 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/

Author: jghoman
Date: Mon Aug 13 20:07:09 2012
New Revision: 1372575

URL: http://svn.apache.org/viewvc?rev=1372575&view=rev
Log:
GIRAPH-275: Restore data locality to workers reading InputSplits where possible without querying NameNode, ZooKeeper. Contributed by Eli Reisman.

Added:
    giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1372575&r1=1372574&r2=1372575&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Aug 13 20:07:09 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-275: Restore data locality to workers reading InputSplits where possible
+  without querying NameNode, ZooKeeper. (Eli Reisman via jghoman)
+  
   GIRAPH-258: Check type compatibility before submitting job. 
   (Eli Reisman via jghoman)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1372575&r1=1372574&r2=1372575&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Mon Aug 13 20:07:09 2012
@@ -134,6 +134,8 @@ public class BspServiceMaster<I extends 
   private AggregatorWriter aggregatorWriter;
   /** Master class */
   private MasterCompute masterCompute;
+  /** Limit locality information added to each InputSplit znode */
+  private final int localityLimit = 5;
 
   /**
    * Constructor for setting up the master.
@@ -517,13 +519,29 @@ public class BspServiceMaster<I extends 
           "some workers will be not used");
     }
     String inputSplitPath = null;
+    String[] splitLocations;
+    InputSplit inputSplit;
+    StringBuilder locations;
     for (int i = 0; i < splitList.size(); ++i) {
       try {
         ByteArrayOutputStream byteArrayOutputStream =
             new ByteArrayOutputStream();
         DataOutput outputStream =
             new DataOutputStream(byteArrayOutputStream);
-        InputSplit inputSplit = splitList.get(i);
+        inputSplit = splitList.get(i);
+        splitLocations = inputSplit.getLocations();
+        locations = null;
+        if (splitLocations != null) {
+          int splitListLength =
+            Math.min(splitLocations.length, localityLimit);
+          locations = new StringBuilder();
+          for (String location : splitLocations) {
+            locations.append(location)
+              .append(--splitListLength > 0 ? "\t" : "");
+          }
+        }
+        Text.writeString(outputStream,
+            locations == null ? "" : locations.toString());
         Text.writeString(outputStream,
             inputSplit.getClass().getName());
         ((Writable) inputSplit).write(outputStream);

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1372575&r1=1372574&r2=1372575&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Mon Aug 13 20:07:09 2012
@@ -205,7 +205,10 @@ public class BspServiceWorker<I extends 
     if (inputSplitCount == -1) {
       inputSplitCount = inputSplitPathList.size();
     }
-
+    LocalityInfoSorter localitySorter = new LocalityInfoSorter(
+      getZkExt(), inputSplitPathList, getHostname());
+    inputSplitPathList =
+      localitySorter.getPrioritizedLocalInputSplits();
     String reservedInputSplitPath = null;
     Stat reservedStat = null;
     while (true) {
@@ -400,6 +403,7 @@ public class BspServiceWorker<I extends 
 
     DataInputStream inputStream =
         new DataInputStream(new ByteArrayInputStream(splitList));
+    Text.readString(inputStream); // location data unused here, skip
     String inputSplitClass = Text.readString(inputStream);
     InputSplit inputSplit = (InputSplit)
         ReflectionUtils.newInstance(

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java?rev=1372575&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java Mon Aug 13 20:07:09 2012
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility class to extract InputSplit locality information
+ * from znodes and to sort the InputSplit list for the worker
+ * owning this object to select splits from.
+ */
+public class LocalityInfoSorter {
+  /** The worker's local ZooKeeperExt ref */
+  private final ZooKeeperExt zooKeeper;
+  /** The List of InputSplit znode paths */
+  private final List<String> pathList;
+  /** The worker's hostname */
+  private final String hostName;
+
+  /**
+   * Constructor
+   * @param zooKeeper the worker's ZkExt
+   * @param pathList the path to read from
+   * @param hostName the worker's host name (for matching)
+   */
+  public LocalityInfoSorter(ZooKeeperExt zooKeeper, List<String> pathList,
+    String hostName) {
+    this.zooKeeper = zooKeeper;
+    this.pathList = pathList;
+    this.hostName = hostName;
+  }
+
+  /**
+   * Re-order list of InputSplits so files local to this worker node's
+   * disk are the first it will iterate over when attempting to claim
+   * a split to read. This will increase locality of data reads with greater
+   * probability as the % of total nodes in the cluster hosting data and workers
+   * BOTH increase towards 100%. Replication increases our chances of a "hit."
+   *
+   * @return the pathList, with host-local splits sorted to the front.
+   */
+  public List<String> getPrioritizedLocalInputSplits() {
+    List<String> sortedList = new ArrayList<String>();
+    boolean prioritize;
+    String hosts = null;
+    for (int index = 0; index < pathList.size(); ++index) {
+      final String path = pathList.get(index);
+      prioritize = false;
+      try {
+        hosts = getLocationsFromZkInputSplitData(path);
+      } catch (IOException ioe) {
+        hosts = null; // no problem, just don't sort this entry
+      } catch (KeeperException ke) {
+        hosts = null;
+      } catch (InterruptedException ie) {
+        hosts = null;
+      }
+      prioritize = hosts == null ? false : hosts.contains(hostName);
+      sortedList.add(prioritize ? 0 : index, path);
+    }
+    return sortedList;
+  }
+
+  /**
+   * Utility for extracting locality data from an InputSplit ZNode.
+   *
+   * @param zkSplitPath the input split path to attempt to read
+   * ZNode locality data from for this InputSplit.
+   * @return an array of String hostnames from ZNode data, or throws
+   */
+  private String getLocationsFromZkInputSplitData(String zkSplitPath)
+    throws IOException, KeeperException, InterruptedException {
+    byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
+    DataInputStream inputStream =
+      new DataInputStream(new ByteArrayInputStream(locationData));
+    // only read the "first" entry in the znode data, the locations
+    return Text.readString(inputStream);
+  }
+}

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1372575&r1=1372574&r2=1372575&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Mon Aug 13 20:07:09 2012
@@ -32,16 +32,19 @@ import org.apache.giraph.examples.Simple
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.LocalityInfoSorter;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.TextAggregatorWriter;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.giraph.io.JsonLongDoubleFloatDoubleVertexOutputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -54,6 +57,7 @@ import org.apache.hadoop.mapreduce.JobCo
 else[HADOOP_NON_SASL_RPC]*/
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 /*end[HADOOP_NON_SASL_RPC]*/
+import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -61,6 +65,9 @@ import static org.junit.Assert.assertSam
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
@@ -71,6 +78,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -285,6 +294,46 @@ public class TestBspBasic extends BspCas
   }
 
   /**
+   * Run a test to see if the LocalityInfoSorter can correctly sort
+   * locality information from a mocked znode of data.
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testLocalityInfoSorter()
+    throws IOException, KeeperException, InterruptedException {
+    final List<String> goodList = new ArrayList<String>();
+    Collections.addAll(goodList, "good", "bad", "ugly");
+    final List<String> testList = new ArrayList<String>();
+    Collections.addAll(testList, "bad", "good", "ugly");
+    final String localHost = "node.LOCAL.com";
+    // build output just as we do to store hostlists in ZNODES
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    String last = "node.test4.com\tnode.test5.com\tnode.test6.com";
+    Text.writeString(dos, last);
+    byte[] LOCALITY_LAST = baos.toByteArray();
+    baos = new ByteArrayOutputStream();
+    dos = new DataOutputStream(baos);
+    String middle = "node.test1.com\tnode.test2.com\tnode.test3.com";
+    Text.writeString(dos, middle);
+    byte[] LOCALITY_MIDDLE = baos.toByteArray();
+    baos = new ByteArrayOutputStream();
+    dos = new DataOutputStream(baos);
+    String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com";
+    Text.writeString(dos, first);
+    byte[] LOCALITY_FIRST = baos.toByteArray();
+    ZooKeeperExt zk = mock(ZooKeeperExt.class);
+    when(zk.getData("ugly", false, null)).thenReturn(LOCALITY_LAST);
+    when(zk.getData("bad", false, null)).thenReturn(LOCALITY_MIDDLE);
+    when(zk.getData("good", false, null)).thenReturn(LOCALITY_FIRST);
+    LocalityInfoSorter lis = new LocalityInfoSorter(zk, testList, localHost);
+    final List<String> resultList = lis.getPrioritizedLocalInputSplits();
+    assertEquals(goodList, resultList);
+  }
+
+  /**
    * Run a sample BSP job locally and test PageRank.
    *
    * @throws IOException