You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/10/06 07:08:50 UTC

svn commit: r1394926 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/

Author: maja
Date: Sat Oct  6 05:08:49 2012
New Revision: 1394926

URL: http://svn.apache.org/viewvc?rev=1394926&view=rev
Log:
GIRAPH-307: Don't recreate InputSplit list

Added:
    giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
Removed:
    giraph/trunk/src/main/java/org/apache/giraph/graph/LocalityInfoSorter.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1394926&r1=1394925&r2=1394926&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Sat Oct  6 05:08:49 2012
@@ -2,6 +2,10 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-307: InputSplit list can be long with many workers 
+  (and locality info) and should not be re-created every time a 
+  worker calls reserveInputSplit() (ereisman via majakabiljo)
+
   GIRAPH-358: Rename package format->io in giraph-formats-contrib for
   consistency with main package. (apresta via aching)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1394926&r1=1394925&r2=1394926&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Oct  6 05:08:49 2012
@@ -92,8 +92,6 @@ public class BspServiceWorker<I extends 
     implements CentralizedServiceWorker<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
-  /** Number of input splits */
-  private int inputSplitCount = -1;
   /** My process health znode */
   private String myHealthZnode;
   /** Worker info */
@@ -126,6 +124,12 @@ public class BspServiceWorker<I extends 
    * Partition store for worker (only used by the Hadoop RPC implementation).
    */
   private final PartitionStore<I, V, E, M> workerPartitionStore;
+  /**
+   * Stores and processes the list of InputSplits advertised
+   * in a tree of child znodes by the master.
+   */
+  private InputSplitPathOrganizer splitOrganizer = null;
+
   /** Handler for aggregators */
   private final WorkerAggregatorHandler aggregatorHandler;
 
@@ -234,20 +238,16 @@ else[HADOOP_NON_SECURE]*/
    */
   private String reserveInputSplit()
     throws KeeperException, InterruptedException {
-    List<String> inputSplitPathList = null;
-    inputSplitPathList =
-        getZkExt().getChildrenExt(inputSplitsPath, false, false, true);
-    if (inputSplitCount == -1) {
-      inputSplitCount = inputSplitPathList.size();
+    if (null == splitOrganizer) {
+      splitOrganizer = new InputSplitPathOrganizer(getZkExt(),
+        inputSplitsPath, getHostname(), getWorkerInfo().getPort());
     }
-    LocalityInfoSorter localitySorter = new LocalityInfoSorter(
-      getZkExt(), inputSplitPathList, getHostname(), getWorkerInfo().getPort());
     String reservedInputSplitPath = null;
     Stat reservedStat = null;
     final Mapper<?, ?, ?, ?>.Context context = getContext();
     while (true) {
       int reservedInputSplits = 0;
-      for (String nextSplitToClaim : localitySorter) {
+      for (String nextSplitToClaim : splitOrganizer) {
         context.progress();
         String tmpInputSplitReservedPath =
             nextSplitToClaim + INPUT_SPLIT_RESERVED_NODE;
@@ -265,7 +265,7 @@ else[HADOOP_NON_SECURE]*/
             if (LOG.isInfoEnabled()) {
               float percentFinished =
                   reservedInputSplits * 100.0f /
-                  inputSplitPathList.size();
+                  splitOrganizer.getPathListSize();
               LOG.info("reserveInputSplit: Reserved input " +
                   "split path " + reservedInputSplitPath +
                   ", overall roughly " +
@@ -292,10 +292,10 @@ else[HADOOP_NON_SECURE]*/
       if (LOG.isInfoEnabled()) {
         LOG.info("reserveInputSplit: reservedPath = " +
             reservedInputSplitPath + ", " + reservedInputSplits +
-            " of " + inputSplitPathList.size() +
+            " of " + splitOrganizer.getPathListSize() +
             " InputSplits are finished.");
       }
-      if (reservedInputSplits == inputSplitPathList.size()) {
+      if (reservedInputSplits == splitOrganizer.getPathListSize()) {
         transferRegulator = null; // don't need this anymore
         return null;
       }

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java?rev=1394926&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java Sat Oct  6 05:08:49 2012
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility class to extract the list of InputSplits from the
+ * ZooKeeper tree of "claimable splits" the master created,
+ * and to sort the list to favor local data blocks.
+ *
+ * This class provides an Iterator for the list the worker will
+ * claim splits from, making all sorting and data-code locality
+ * processing done here invisible to callers. The aim is to cut
+ * down on the number of ZK reads workers perform before locating
+ * an unclaimed InputSplit.
+ */
+public class InputSplitPathOrganizer implements Iterable<String> {
+  /** The worker's local ZooKeeperExt ref */
+  private final ZooKeeperExt zooKeeper;
+  /** The List of InputSplit znode paths */
+  private final List<String> pathList;
+  /** The worker's hostname */
+  private final String hostName;
+  /** The adjusted base offset by which to iterate on the path list */
+  private int baseOffset;
+
+  /**
+   * Constructor
+   * @param zooKeeper the worker's ZkExt
+   * @param zkPathList the path to read from
+   * @param hostName the worker's host name (for matching)
+   * @param port the port number for this worker
+   */
+  public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
+    final String zkPathList, final String hostName, final int port)
+    throws KeeperException, InterruptedException {
+    this.zooKeeper = zooKeeper;
+    this.pathList = zooKeeper.getChildrenExt(zkPathList, false, false, true);
+    this.hostName = hostName;
+    this.baseOffset = 0; // set later after switching out local paths
+    prioritizeLocalInputSplits(port);
+  }
+
+ /**
+  * Re-order list of InputSplits so files local to this worker node's
+  * disk are the first it will iterate over when attempting to claim
+  * a split to read. This will increase locality of data reads with greater
+  * probability as the % of total nodes in the cluster hosting data and workers
+  * BOTH increase towards 100%. Replication increases our chances of a "hit."
+  * @param port the port number for hashing unique iteration indexes for all
+  *             workers, even those sharing the same host node.
+  */
+  private void prioritizeLocalInputSplits(final int port) {
+    List<String> sortedList = new ArrayList<String>();
+    String hosts = null;
+    for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
+      final String path = iterator.next();
+      try {
+        hosts = getLocationsFromZkInputSplitData(path);
+      } catch (IOException ioe) {
+        hosts = null; // no problem, just don't sort this entry
+      } catch (KeeperException ke) {
+        hosts = null;
+      } catch (InterruptedException ie) {
+        hosts = null;
+      }
+      if (hosts != null && hosts.contains(hostName)) {
+        sortedList.add(path); // collect the local block
+        iterator.remove(); // remove local block from list
+      }
+    }
+    // shuffle the local blocks in case several workers exist on this host
+    Collections.shuffle(sortedList);
+    // determine the hash-based offset for this worker to iterate from
+    // and place the local blocks into the list at that index, if any
+    final int temp = hostName.hashCode() + (19 * port);
+    if (pathList.size() != 0) {
+      baseOffset =
+        Math.abs(temp == Integer.MIN_VALUE ? 0 : temp) % pathList.size();
+    }
+    // re-insert local paths at "adjusted index zero" for caller to iterate on
+    pathList.addAll(baseOffset, sortedList);
+  }
+
+  /**
+   * Utility for extracting locality data from an InputSplit ZNode.
+   *
+   * @param zkSplitPath the input split path to attempt to read
+   * ZNode locality data from for this InputSplit.
+   * @return a String of hostnames from ZNode data, or throws
+   */
+  private String getLocationsFromZkInputSplitData(String zkSplitPath)
+    throws IOException, KeeperException, InterruptedException {
+    byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
+    DataInputStream inputStream =
+      new DataInputStream(new ByteArrayInputStream(locationData));
+    // only read the "first" entry in the znode data, the locations
+    return Text.readString(inputStream);
+  }
+
+  /**
+   * Utility accessor for Input Split znode path list size
+   * @return the size of <code>this.pathList</code>
+   */
+  public int getPathListSize() {
+    return this.pathList.size();
+  }
+
+  /**
+   * Iterator for the pathList
+   * @return an iterator for our list of input split paths
+   */
+  public Iterator<String> iterator() {
+    return new PathListIterator();
+  }
+
+  /**
+   * Iterator for path list that handles the locality and hash offsetting.
+   */
+  public class PathListIterator implements Iterator<String> {
+    /** the current iterator index */
+    private int currentIndex = 0;
+
+    /**
+     *  Do we have more list to iterate upon?
+     *  @return true if more path strings are available
+     */
+    @Override
+    public boolean hasNext() {
+      return currentIndex < pathList.size();
+    }
+
+    /** return the next pathList element
+     * @return the next input split path
+     */
+    @Override
+    public String next() {
+      return pathList.get((baseOffset + currentIndex++) % pathList.size());
+    }
+
+    /** Just a placeholder; should not do anything! */
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove is not allowed.");
+    }
+  }
+}

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1394926&r1=1394925&r2=1394926&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Sat Oct  6 05:08:49 2012
@@ -33,7 +33,7 @@ import org.apache.giraph.examples.Simple
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.LocalityInfoSorter;
+import org.apache.giraph.graph.InputSplitPathOrganizer;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.TextAggregatorWriter;
 import org.apache.giraph.graph.Vertex;
@@ -312,20 +312,21 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
   }
 
   /**
-   * Run a test to see if the LocalityInfoSorter can correctly sort
+   * Run a test to see if the InputSplitPathOrganizer can correctly sort
    * locality information from a mocked znode of data.
    * @throws IOException
    * @throws KeeperException
    * @throws InterruptedException
    */
   @Test
-  public void testLocalityInfoSorter()
+  public void testInputSplitPathOrganizer()
     throws IOException, KeeperException, InterruptedException {
     final List<String> goodList = new ArrayList<String>();
     Collections.addAll(goodList, "good", "bad", "ugly");
     final List<String> testList = new ArrayList<String>();
     Collections.addAll(testList, "bad", "good", "ugly");
     final String localHost = "node.LOCAL.com";
+    final String testListName = "test_list_parent_znode";
     // build output just as we do to store hostlists in ZNODES
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream dos = new DataOutputStream(baos);
@@ -343,11 +344,12 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
     Text.writeString(dos, first);
     byte[] LOCALITY_FIRST = baos.toByteArray();
     ZooKeeperExt zk = mock(ZooKeeperExt.class);
+    when(zk.getChildrenExt(testListName, false, false, true)).thenReturn(testList);
     when(zk.getData("ugly", false, null)).thenReturn(LOCALITY_LAST);
     when(zk.getData("bad", false, null)).thenReturn(LOCALITY_MIDDLE);
     when(zk.getData("good", false, null)).thenReturn(LOCALITY_FIRST);
-    LocalityInfoSorter lis =
-      new LocalityInfoSorter(zk, testList, localHost, 0);
+    InputSplitPathOrganizer lis =
+      new InputSplitPathOrganizer(zk, testListName, localHost, 0);
     final List<String> resultList = new ArrayList<String>();
     for (String next : lis) {
       resultList.add(next);