You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/10/01 22:35:39 UTC

svn commit: r1392574 - in /giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/graph/BspServiceMaster.java src/main/java/org/apache/giraph/utils/ProgressableUtils.java

Author: maja
Date: Mon Oct  1 20:35:39 2012
New Revision: 1392574

URL: http://svn.apache.org/viewvc?rev=1392574&view=rev
Log:
GIRAPH-326: Writing input splits to ZooKeeper in parallel

Added:
    giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1392574&r1=1392573&r2=1392574&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct  1 20:35:39 2012
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-326: Writing input splits to ZooKeeper in parallel (maja)
+
   GIRAPH-335: Add committer information for Maja Kabiljo to pom.xml
   (maja)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1392574&r1=1392573&r2=1392574&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Mon Oct  1 20:35:39 2012
@@ -31,6 +31,7 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
@@ -74,6 +75,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
@@ -95,6 +99,11 @@ public class BspServiceMaster<I extends 
   public static final String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
   /** Print worker names only if there are 10 workers left */
   public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
+  /** How many threads to use when writing input splits to zookeeper*/
+  public static final String INPUT_SPLIT_THREAD_COUNT =
+      "giraph.inputSplitThreadCount";
+  /** Default number of threads to use when writing input splits to zookeeper */
+  public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
   /** Superstep counter */
@@ -531,59 +540,25 @@ public class BspServiceMaster<I extends 
           "=number of healthy processes, " +
           "some workers will be not used");
     }
-    String inputSplitPath = null;
-    String[] splitLocations;
-    InputSplit inputSplit;
-    StringBuilder locations;
+
+    // Write input splits to zookeeper in parallel
+    int inputSplitThreadCount = getConfiguration().getInt(
+        INPUT_SPLIT_THREAD_COUNT,
+        DEFAULT_INPUT_SPLIT_THREAD_COUNT);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("createInputSplits: Starting to write input split data to " +
+          "zookeeper with " + inputSplitThreadCount + " threads");
+    }
+    ExecutorService taskExecutor =
+        Executors.newFixedThreadPool(inputSplitThreadCount);
     for (int i = 0; i < splitList.size(); ++i) {
-      try {
-        ByteArrayOutputStream byteArrayOutputStream =
-            new ByteArrayOutputStream();
-        DataOutput outputStream =
-            new DataOutputStream(byteArrayOutputStream);
-        inputSplit = splitList.get(i);
-        splitLocations = inputSplit.getLocations();
-        locations = null;
-        if (splitLocations != null) {
-          int splitListLength =
-            Math.min(splitLocations.length, localityLimit);
-          locations = new StringBuilder();
-          for (String location : splitLocations) {
-            locations.append(location)
-              .append(--splitListLength > 0 ? "\t" : "");
-          }
-        }
-        Text.writeString(outputStream,
-            locations == null ? "" : locations.toString());
-        Text.writeString(outputStream,
-            inputSplit.getClass().getName());
-        ((Writable) inputSplit).write(outputStream);
-        inputSplitPath = inputSplitsPath + "/" + i;
-        getZkExt().createExt(inputSplitPath,
-            byteArrayOutputStream.toByteArray(),
-            Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT,
-            true);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("createInputSplits: Created input split " +
-              "with index " + i + " serialized as " +
-              byteArrayOutputStream.toString());
-        }
-      } catch (KeeperException.NodeExistsException e) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("createInputSplits: Node " +
-              inputSplitPath + " already exists.");
-        }
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "createInputSplits: KeeperException", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "createInputSplits: IllegalStateException", e);
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "createInputSplits: IOException", e);
-      }
+      InputSplit inputSplit = splitList.get(i);
+      taskExecutor.submit(new WriteInputSplit(inputSplit, i));
+    }
+    taskExecutor.shutdown();
+    ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
+    if (LOG.isInfoEnabled()) {
+      LOG.info("createInputSplits: Done writing input split data to zookeeper");
     }
 
     // Let workers know they can start trying to load the input splits
@@ -1905,4 +1880,81 @@ public class BspServiceMaster<I extends 
         globalStats.getMessageCount() -
             sentMessagesCounter.getValue());
   }
+
+  /**
+   * Task that writes a given input split to zookeeper.
+   * Upon failure call() throws an exception.
+   */
+  private class WriteInputSplit implements Callable<Void> {
+    /** Input split which we are going to write */
+    private final InputSplit inputSplit;
+    /** Index of the input split */
+    private final int index;
+
+    /**
+     * Constructor
+     *
+     * @param inputSplit Input split which we are going to write
+     * @param index Index of the input split
+     */
+    public WriteInputSplit(InputSplit inputSplit, int index) {
+      this.inputSplit = inputSplit;
+      this.index = index;
+    }
+
+    @Override
+    public Void call() {
+      String inputSplitPath = null;
+      try {
+        ByteArrayOutputStream byteArrayOutputStream =
+            new ByteArrayOutputStream();
+        DataOutput outputStream =
+            new DataOutputStream(byteArrayOutputStream);
+
+        String[] splitLocations = inputSplit.getLocations();
+        StringBuilder locations = null;
+        if (splitLocations != null) {
+          int splitListLength =
+              Math.min(splitLocations.length, localityLimit);
+          locations = new StringBuilder();
+          for (String location : splitLocations) {
+            locations.append(location)
+                .append(--splitListLength > 0 ? "\t" : "");
+          }
+        }
+        Text.writeString(outputStream,
+            locations == null ? "" : locations.toString());
+        Text.writeString(outputStream,
+            inputSplit.getClass().getName());
+        ((Writable) inputSplit).write(outputStream);
+        inputSplitPath = inputSplitsPath + "/" + index;
+        getZkExt().createExt(inputSplitPath,
+            byteArrayOutputStream.toByteArray(),
+            Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT,
+            true);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("call: Created input split " +
+              "with index " + index + " serialized as " +
+              byteArrayOutputStream.toString());
+        }
+      } catch (KeeperException.NodeExistsException e) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("call: Node " +
+              inputSplitPath + " already exists.");
+        }
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "call: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "call: IllegalStateException", e);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "call: IOException", e);
+      }
+      return null;
+    }
+  }
 }

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java?rev=1392574&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java Mon Oct  1 20:35:39 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** Functions for waiting on some events to happen while reporting progress */
+public class ProgressableUtils {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(ProgressableUtils.class);
+  /** Msecs to refresh the progress meter */
+  private static final int MSEC_PERIOD = 10000;
+
+  /** Do not instantiate. */
+  private ProgressableUtils() { }
+
+  /**
+   * Wait for executor tasks to terminate, while periodically reporting
+   * progress.
+   *
+   * @param executor     Executor which we are waiting for
+   * @param progressable Progressable for reporting progress (Job context)
+   */
+  public static void awaitExecutorTermination(ExecutorService executor,
+      Progressable progressable) {
+    while (!awaitExecutorTermination(executor, progressable, MSEC_PERIOD)) {
+      progressable.progress();
+    }
+  }
+
+  /**
+   * Wait maximum given number of milliseconds for executor tasks to terminate,
+   * while periodically reporting progress.
+   *
+   * @param executor Executor which we are waiting for
+   * @param progressable Progressable for reporting progress (Job context)
+   * @param remainingWaitMsecs Number of milliseconds to wait
+   * @return Whether all executor tasks terminated or not
+   */
+  public static boolean awaitExecutorTermination(ExecutorService executor,
+      Progressable progressable, int remainingWaitMsecs) {
+    long timeoutTimeMsecs = System.currentTimeMillis() + remainingWaitMsecs;
+    int currentWaitMsecs;
+    while (true) {
+      currentWaitMsecs = Math.min(remainingWaitMsecs, MSEC_PERIOD);
+      try {
+        if (executor.awaitTermination(currentWaitMsecs,
+            TimeUnit.MILLISECONDS)) {
+          return true;
+        }
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("awaitExecutorTermination: " +
+            "InterruptedException occurred while waiting for executor's " +
+            "tasks to terminate", e);
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("awaitExecutorTermination: " +
+            "Waiting for executor tasks to terminate " + executor.toString());
+      }
+      if (System.currentTimeMillis() >= timeoutTimeMsecs) {
+        return false;
+      }
+      progressable.progress();
+      remainingWaitMsecs = Math.max(0, remainingWaitMsecs - currentWaitMsecs);
+    }
+  }
+}