You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/06/20 20:39:54 UTC
git commit: updated refs/heads/trunk to d827c97
Repository: giraph
Updated Branches:
refs/heads/trunk 24664b939 -> d827c97fc
Improve out-of-core metrics
Summary: For the metric showing the percentage of the graph in memory it makes more sense to show the lowest fraction of the graph that was in memory during a superstep. Basically, a user is more interested to see how bad was the out-of-core execution, and how many more machines he/she needs to use to run the job entirely in memory.
Test Plan:
mvn clean verify
visual, looking at Hadoop metric and per-worker metric
Reviewers: sergey.edunov, dionysis.logothetis, maja.kabiljo
Reviewed By: dionysis.logothetis, maja.kabiljo
Differential Revision: https://reviews.facebook.net/D59451
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d827c97f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d827c97f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d827c97f
Branch: refs/heads/trunk
Commit: d827c97fc244dd89ccee46e686eaf008889550ed
Parents: 24664b9
Author: Hassan Eslami <he...@fb.com>
Authored: Mon Jun 20 12:23:42 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Jun 20 12:23:52 2016 -0700
----------------------------------------------------------------------
.../org/apache/giraph/counters/GiraphStats.java | 14 ++++++++-
.../org/apache/giraph/graph/GlobalStats.java | 17 ++++++++++
.../giraph/job/CombinedWorkerProgress.java | 18 +++++++++++
.../apache/giraph/master/BspServiceMaster.java | 11 +++++++
.../giraph/metrics/WorkerSuperstepMetrics.java | 4 +--
.../org/apache/giraph/ooc/OutOfCoreEngine.java | 13 +++-----
.../giraph/ooc/data/MetaPartitionManager.java | 33 ++++++++++++++++++++
.../apache/giraph/worker/WorkerProgress.java | 22 +++++++++++++
.../giraph/worker/WorkerProgressStats.java | 3 ++
9 files changed, 122 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
index 0cb8486..7e22d48 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
@@ -62,6 +62,9 @@ public class GiraphStats extends HadoopCountersBase {
/** aggregate bytes stored to local disks in out-of-core */
public static final String OOC_BYTES_STORED_NAME =
"Aggregate bytes stored to local disks (out-of-core)";
+ /** lowest percentage of graph in memory throughout the execution */
+ public static final String LOWEST_GRAPH_PERCENTAGE_IN_MEMORY_NAME =
+ "Lowest percentage of graph in memory so far (out-of-core)";
/** Singleton instance for everyone to use */
private static GiraphStats INSTANCE;
@@ -92,8 +95,10 @@ public class GiraphStats extends HadoopCountersBase {
private static final int OOC_BYTES_LOADED = 11;
/** Aggregate OOC stored bytes counter */
private static final int OOC_BYTES_STORED = 12;
+ /** Lowest percentage of graph in memory over time */
+ private static final int LOWEST_GRAPH_PERCENTAGE_IN_MEMORY = 13;
/** Number of counters in this class */
- private static final int NUM_COUNTERS = 13;
+ private static final int NUM_COUNTERS = 14;
/** All the counters stored */
private final GiraphHadoopCounter[] counters;
@@ -123,6 +128,9 @@ public class GiraphStats extends HadoopCountersBase {
getCounter(AGGREGATE_SENT_MESSAGE_BYTES_NAME);
counters[OOC_BYTES_LOADED] = getCounter(OOC_BYTES_LOADED_NAME);
counters[OOC_BYTES_STORED] = getCounter(OOC_BYTES_STORED_NAME);
+ counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY] =
+ getCounter(LOWEST_GRAPH_PERCENTAGE_IN_MEMORY_NAME);
+ counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY].setValue(100);
}
/**
@@ -260,6 +268,10 @@ public class GiraphStats extends HadoopCountersBase {
return counters[OOC_BYTES_STORED];
}
+ public GiraphHadoopCounter getLowestGraphPercentageInMemory() {
+ return counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY];
+ }
+
@Override
public Iterator<GiraphHadoopCounter> iterator() {
return Arrays.asList(counters).iterator();
http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
index dab3c2f..5636260 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
@@ -46,6 +46,8 @@ public class GlobalStats implements Writable {
private long oocStoreBytesCount = 0;
/** Bytes of data loaded to disk in the last superstep */
private long oocLoadBytesCount = 0;
+ /** Lowest percentage of graph in memory throughout the execution */
+ private int lowestGraphPercentageInMemory = 100;
/**
* Master's decision on whether we should checkpoint and
* what to do next.
@@ -108,6 +110,15 @@ public class GlobalStats implements Writable {
this.checkpointStatus = checkpointStatus;
}
+ public int getLowestGraphPercentageInMemory() {
+ return lowestGraphPercentageInMemory;
+ }
+
+ public void setLowestGraphPercentageInMemory(
+ int lowestGraphPercentageInMemory) {
+ this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory;
+ }
+
/**
* Add bytes loaded to the global stats.
*
@@ -151,6 +162,9 @@ public class GlobalStats implements Writable {
edgeCount = input.readLong();
messageCount = input.readLong();
messageBytesCount = input.readLong();
+ oocLoadBytesCount = input.readLong();
+ oocStoreBytesCount = input.readLong();
+ lowestGraphPercentageInMemory = input.readInt();
haltComputation = input.readBoolean();
if (input.readBoolean()) {
checkpointStatus = CheckpointStatus.values()[input.readInt()];
@@ -166,6 +180,9 @@ public class GlobalStats implements Writable {
output.writeLong(edgeCount);
output.writeLong(messageCount);
output.writeLong(messageBytesCount);
+ output.writeLong(oocLoadBytesCount);
+ output.writeLong(oocStoreBytesCount);
+ output.writeInt(lowestGraphPercentageInMemory);
output.writeBoolean(haltComputation);
output.writeBoolean(checkpointStatus != null);
if (checkpointStatus != null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
index e931a99..e265163 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
@@ -64,6 +64,13 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
private int workerWithMinFreeMemory;
/** Minimum fraction of free memory on a worker */
private double minFreeMemoryFraction = Double.MAX_VALUE;
+ /**
+ * Minimum percentage of graph in memory in any worker so far in the
+ * computation
+ */
+ private int minGraphPercentageInMemory = 100;
+ /** Id of the worker with min percentage of graph in memory */
+ private int workerWithMinGraphPercentageInMemory = -1;
/**
* Constructor
@@ -116,6 +123,11 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
minFreeMemoryFraction = Math.min(minFreeMemoryFraction,
workerProgress.getFreeMemoryFraction());
freeMemoryMB += workerProgress.getFreeMemoryMB();
+ int percentage = workerProgress.getLowestGraphPercentageInMemory();
+ if (percentage < minGraphPercentageInMemory) {
+ minGraphPercentageInMemory = percentage;
+ workerWithMinGraphPercentageInMemory = workerProgress.getTaskId();
+ }
}
if (!Iterables.isEmpty(workerProgresses)) {
freeMemoryMB /= Iterables.size(workerProgresses);
@@ -164,6 +176,12 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
if (minFreeMemoryFraction < normalFreeMemoryFraction) {
sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******");
}
+ if (minGraphPercentageInMemory < 100) {
+ sb.append(" Spilling ")
+ .append(100 - minGraphPercentageInMemory)
+ .append("% of data to external storage on worker ")
+ .append(workerWithMinGraphPercentageInMemory);
+ }
return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 8372bd3..605e818 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -946,6 +946,12 @@ public class BspServiceMaster<I extends WritableComparable,
workerMetrics.getBytesLoadedFromDisk());
globalStats.addOocStoreBytesCount(
workerMetrics.getBytesStoredOnDisk());
+ // Find the lowest percentage of graph in memory across all workers
+ // for one superstep
+ globalStats.setLowestGraphPercentageInMemory(
+ Math.min(globalStats.getLowestGraphPercentageInMemory(),
+ (int) Math.round(
+ workerMetrics.getGraphPercentageInMemory())));
aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
}
} catch (JSONException e) {
@@ -2058,5 +2064,10 @@ public class BspServiceMaster<I extends WritableComparable,
.increment(globalStats.getOocLoadBytesCount());
gs.getAggregateOOCBytesStored()
.increment(globalStats.getOocStoreBytesCount());
+ // Updating the lowest percentage of graph in memory throughout the
+ // execution across all the supersteps
+ int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue();
+ gs.getLowestGraphPercentageInMemory().setValue(
+ Math.min(percentage, globalStats.getLowestGraphPercentageInMemory()));
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
index 219bcbd..e4281d9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
@@ -68,7 +68,7 @@ public class WorkerSuperstepMetrics implements Writable {
superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS);
bytesLoadedFromDisk = 0;
bytesStoredOnDisk = 0;
- graphPercentageInMemory = 0;
+ graphPercentageInMemory = 100;
}
/**
@@ -93,8 +93,6 @@ public class WorkerSuperstepMetrics implements Writable {
registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
if (gauge != null) {
graphPercentageInMemory = gauge.value();
- } else {
- graphPercentageInMemory = 100;
}
return this;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index d4c2de5..2037abe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -19,7 +19,7 @@
package org.apache.giraph.ooc;
import com.sun.management.GarbageCollectionNotificationInfo;
-import com.yammer.metrics.util.PercentGauge;
+import com.yammer.metrics.core.Gauge;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
@@ -485,15 +485,10 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
@Override
public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
- superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new PercentGauge() {
+ superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
@Override
- protected double getNumerator() {
- return metaPartitionManager.getNumInMemoryPartitions();
- }
-
- @Override
- protected double getDenominator() {
- return metaPartitionManager.getNumPartitions();
+ public Double value() {
+ return metaPartitionManager.getLowestGraphFractionInMemory() * 100;
}
});
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 784d578..1332a3a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -20,9 +20,11 @@ package org.apache.giraph.ooc.data;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AtomicDouble;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.giraph.worker.WorkerProgress;
import org.apache.log4j.Logger;
import java.util.ArrayList;
@@ -87,6 +89,16 @@ public class MetaPartitionManager {
* processing
*/
private final Random randomGenerator;
+ /**
+ * What is the lowest fraction of partitions in memory, relative to the total
+ * number of available partitions? This is an indirect estimation of the
+ * amount of graph in memory, which can be used to estimate how many more
+ * machines needed to avoid out-of-core execution. At the beginning all the
+ * graph is in memory, so the fraction is 1. This fraction is calculated per
+ * superstep.
+ */
+ private final AtomicDouble lowestGraphFractionInMemory =
+ new AtomicDouble(1);
/**
* Constructor
@@ -125,6 +137,24 @@ public class MetaPartitionManager {
return partitions.size();
}
+ public double getLowestGraphFractionInMemory() {
+ return lowestGraphFractionInMemory.get();
+ }
+
+ /**
+ * Update the lowest fraction of graph in memory so to have a more accurate
+ * information in one of the counters.
+ */
+ private synchronized void updateGraphFractionInMemory() {
+ double graphInMemory =
+ (double) getNumInMemoryPartitions() / getNumPartitions();
+ if (graphInMemory < lowestGraphFractionInMemory.get()) {
+ lowestGraphFractionInMemory.set(graphInMemory);
+ WorkerProgress.get().updateLowestGraphPercentageInMemory(
+ (int) (graphInMemory * 100));
+ }
+ }
+
/**
* Whether a given partition is available
*
@@ -592,6 +622,7 @@ public class MetaPartitionManager {
*/
public void doneOffloadingPartition(int partitionId) {
numInMemoryPartitions.getAndDecrement();
+ updateGraphFractionInMemory();
MetaPartition meta = partitions.get(partitionId);
int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
synchronized (meta) {
@@ -618,6 +649,8 @@ public class MetaPartitionManager {
dictionary.reset();
}
numPartitionsProcessed.set(0);
+ lowestGraphFractionInMemory.set((double) getNumInMemoryPartitions() /
+ getNumPartitions());
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
index eb543cd..4065869 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
@@ -181,6 +181,17 @@ public final class WorkerProgress extends WorkerProgressStats {
freeMemoryFraction = MemoryUtils.freeMemoryFraction();
}
+ /**
+ * Update lowest percentage of graph which stayed in memory so far in the
+ * execution
+ *
+ * @param fraction the fraction of graph in memory so far in this superstep
+ */
+ public synchronized void updateLowestGraphPercentageInMemory(int fraction) {
+ lowestGraphPercentageInMemory =
+ Math.min(lowestGraphPercentageInMemory, fraction);
+ }
+
@ThriftField(1)
public synchronized long getCurrentSuperstep() {
return currentSuperstep;
@@ -281,6 +292,11 @@ public final class WorkerProgress extends WorkerProgressStats {
return freeMemoryFraction;
}
+ @ThriftField(21)
+ public synchronized int getLowestGraphPercentageInMemory() {
+ return lowestGraphPercentageInMemory;
+ }
+
public synchronized boolean isInputSuperstep() {
return currentSuperstep == -1;
}
@@ -392,4 +408,10 @@ public final class WorkerProgress extends WorkerProgressStats {
public synchronized void setTaskId(int taskId) {
this.taskId = taskId;
}
+
+ @ThriftField
+ public synchronized void setLowestGraphPercentageInMemory(
+ int lowestGraphPercentageInMemory) {
+ this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
index 04ed2ea..583b073 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
@@ -72,6 +72,9 @@ public class WorkerProgressStats {
/** Fraction of memory that's free */
protected double freeMemoryFraction;
+ /** Lowest percentage of graph in memory throughout the execution so far */
+ protected int lowestGraphPercentageInMemory = 100;
+
public boolean isInputSuperstep() {
return currentSuperstep == -1;
}