You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/10/01 22:35:39 UTC
svn commit: r1392574 - in /giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/graph/BspServiceMaster.java
src/main/java/org/apache/giraph/utils/ProgressableUtils.java
Author: maja
Date: Mon Oct 1 20:35:39 2012
New Revision: 1392574
URL: http://svn.apache.org/viewvc?rev=1392574&view=rev
Log:
GIRAPH-326: Writing input splits to ZooKeeper in parallel
Added:
giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1392574&r1=1392573&r2=1392574&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 1 20:35:39 2012
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-326: Writing input splits to ZooKeeper in parallel (maja)
+
GIRAPH-335: Add committer information for Maja Kabiljo to pom.xml
(maja)
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1392574&r1=1392573&r2=1392574&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Mon Oct 1 20:35:39 2012
@@ -31,6 +31,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
@@ -74,6 +75,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
@@ -95,6 +99,11 @@ public class BspServiceMaster<I extends
public static final String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
/** Print worker names only if there are 10 workers left */
public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
+ /** How many threads to use when writing input splits to zookeeper*/
+ public static final String INPUT_SPLIT_THREAD_COUNT =
+ "giraph.inputSplitThreadCount";
+ /** Default number of threads to use when writing input splits to zookeeper */
+ public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
/** Class logger */
private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
/** Superstep counter */
@@ -531,59 +540,25 @@ public class BspServiceMaster<I extends
"=number of healthy processes, " +
"some workers will be not used");
}
- String inputSplitPath = null;
- String[] splitLocations;
- InputSplit inputSplit;
- StringBuilder locations;
+
+ // Write input splits to zookeeper in parallel
+ int inputSplitThreadCount = getConfiguration().getInt(
+ INPUT_SPLIT_THREAD_COUNT,
+ DEFAULT_INPUT_SPLIT_THREAD_COUNT);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("createInputSplits: Starting to write input split data to " +
+ "zookeeper with " + inputSplitThreadCount + " threads");
+ }
+ ExecutorService taskExecutor =
+ Executors.newFixedThreadPool(inputSplitThreadCount);
for (int i = 0; i < splitList.size(); ++i) {
- try {
- ByteArrayOutputStream byteArrayOutputStream =
- new ByteArrayOutputStream();
- DataOutput outputStream =
- new DataOutputStream(byteArrayOutputStream);
- inputSplit = splitList.get(i);
- splitLocations = inputSplit.getLocations();
- locations = null;
- if (splitLocations != null) {
- int splitListLength =
- Math.min(splitLocations.length, localityLimit);
- locations = new StringBuilder();
- for (String location : splitLocations) {
- locations.append(location)
- .append(--splitListLength > 0 ? "\t" : "");
- }
- }
- Text.writeString(outputStream,
- locations == null ? "" : locations.toString());
- Text.writeString(outputStream,
- inputSplit.getClass().getName());
- ((Writable) inputSplit).write(outputStream);
- inputSplitPath = inputSplitsPath + "/" + i;
- getZkExt().createExt(inputSplitPath,
- byteArrayOutputStream.toByteArray(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("createInputSplits: Created input split " +
- "with index " + i + " serialized as " +
- byteArrayOutputStream.toString());
- }
- } catch (KeeperException.NodeExistsException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("createInputSplits: Node " +
- inputSplitPath + " already exists.");
- }
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "createInputSplits: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "createInputSplits: IllegalStateException", e);
- } catch (IOException e) {
- throw new IllegalStateException(
- "createInputSplits: IOException", e);
- }
+ InputSplit inputSplit = splitList.get(i);
+ taskExecutor.submit(new WriteInputSplit(inputSplit, i));
+ }
+ taskExecutor.shutdown();
+ ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("createInputSplits: Done writing input split data to zookeeper");
}
// Let workers know they can start trying to load the input splits
@@ -1905,4 +1880,81 @@ public class BspServiceMaster<I extends
globalStats.getMessageCount() -
sentMessagesCounter.getValue());
}
+
+ /**
+ * Task that writes a given input split to zookeeper.
+ * Upon failure call() throws an exception.
+ */
+ private class WriteInputSplit implements Callable<Void> {
+ /** Input split which we are going to write */
+ private final InputSplit inputSplit;
+ /** Index of the input split */
+ private final int index;
+
+ /**
+ * Constructor
+ *
+ * @param inputSplit Input split which we are going to write
+ * @param index Index of the input split
+ */
+ public WriteInputSplit(InputSplit inputSplit, int index) {
+ this.inputSplit = inputSplit;
+ this.index = index;
+ }
+
+ @Override
+ public Void call() {
+ String inputSplitPath = null;
+ try {
+ ByteArrayOutputStream byteArrayOutputStream =
+ new ByteArrayOutputStream();
+ DataOutput outputStream =
+ new DataOutputStream(byteArrayOutputStream);
+
+ String[] splitLocations = inputSplit.getLocations();
+ StringBuilder locations = null;
+ if (splitLocations != null) {
+ int splitListLength =
+ Math.min(splitLocations.length, localityLimit);
+ locations = new StringBuilder();
+ for (String location : splitLocations) {
+ locations.append(location)
+ .append(--splitListLength > 0 ? "\t" : "");
+ }
+ }
+ Text.writeString(outputStream,
+ locations == null ? "" : locations.toString());
+ Text.writeString(outputStream,
+ inputSplit.getClass().getName());
+ ((Writable) inputSplit).write(outputStream);
+ inputSplitPath = inputSplitsPath + "/" + index;
+ getZkExt().createExt(inputSplitPath,
+ byteArrayOutputStream.toByteArray(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("call: Created input split " +
+ "with index " + index + " serialized as " +
+ byteArrayOutputStream.toString());
+ }
+ } catch (KeeperException.NodeExistsException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("call: Node " +
+ inputSplitPath + " already exists.");
+ }
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "call: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "call: IllegalStateException", e);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "call: IOException", e);
+ }
+ return null;
+ }
+ }
}
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java?rev=1392574&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/ProgressableUtils.java Mon Oct 1 20:35:39 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** Functions for waiting on some events to happen while reporting progress */
+public class ProgressableUtils {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(ProgressableUtils.class);
+ /** Msecs to refresh the progress meter */
+ private static final int MSEC_PERIOD = 10000;
+
+ /** Do not instantiate. */
+ private ProgressableUtils() { }
+
+ /**
+ * Wait for executor tasks to terminate, while periodically reporting
+ * progress.
+ *
+ * @param executor Executor which we are waiting for
+ * @param progressable Progressable for reporting progress (Job context)
+ */
+ public static void awaitExecutorTermination(ExecutorService executor,
+ Progressable progressable) {
+ while (!awaitExecutorTermination(executor, progressable, MSEC_PERIOD)) {
+ progressable.progress();
+ }
+ }
+
+ /**
+ * Wait maximum given number of milliseconds for executor tasks to terminate,
+ * while periodically reporting progress.
+ *
+ * @param executor Executor which we are waiting for
+ * @param progressable Progressable for reporting progress (Job context)
+ * @param remainingWaitMsecs Number of milliseconds to wait
+ * @return Whether all executor tasks terminated or not
+ */
+ public static boolean awaitExecutorTermination(ExecutorService executor,
+ Progressable progressable, int remainingWaitMsecs) {
+ long timeoutTimeMsecs = System.currentTimeMillis() + remainingWaitMsecs;
+ int currentWaitMsecs;
+ while (true) {
+ currentWaitMsecs = Math.min(remainingWaitMsecs, MSEC_PERIOD);
+ try {
+ if (executor.awaitTermination(currentWaitMsecs,
+ TimeUnit.MILLISECONDS)) {
+ return true;
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("awaitExecutorTermination: " +
+ "InterruptedException occurred while waiting for executor's " +
+ "tasks to terminate", e);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("awaitExecutorTermination: " +
+ "Waiting for executor tasks to terminate " + executor.toString());
+ }
+ if (System.currentTimeMillis() >= timeoutTimeMsecs) {
+ return false;
+ }
+ progressable.progress();
+ remainingWaitMsecs = Math.max(0, remainingWaitMsecs - currentWaitMsecs);
+ }
+ }
+}