You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/06/20 20:39:54 UTC

git commit: updated refs/heads/trunk to d827c97

Repository: giraph
Updated Branches:
  refs/heads/trunk 24664b939 -> d827c97fc


Improve out-of-core metrics

Summary: For the metric showing the percentage of the graph in memory it makes more sense to show the lowest fraction of the graph that was in memory during a superstep. Basically, a user is more interested to see how bad was the out-of-core execution, and how many more machines he/she needs to use to run the job entirely in memory.

Test Plan:
mvn clean verify
visual, looking at Hadoop metric and per-worker metric

Reviewers: sergey.edunov, dionysis.logothetis, maja.kabiljo

Reviewed By: dionysis.logothetis, maja.kabiljo

Differential Revision: https://reviews.facebook.net/D59451


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d827c97f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d827c97f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d827c97f

Branch: refs/heads/trunk
Commit: d827c97fc244dd89ccee46e686eaf008889550ed
Parents: 24664b9
Author: Hassan Eslami <he...@fb.com>
Authored: Mon Jun 20 12:23:42 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Jun 20 12:23:52 2016 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/counters/GiraphStats.java | 14 ++++++++-
 .../org/apache/giraph/graph/GlobalStats.java    | 17 ++++++++++
 .../giraph/job/CombinedWorkerProgress.java      | 18 +++++++++++
 .../apache/giraph/master/BspServiceMaster.java  | 11 +++++++
 .../giraph/metrics/WorkerSuperstepMetrics.java  |  4 +--
 .../org/apache/giraph/ooc/OutOfCoreEngine.java  | 13 +++-----
 .../giraph/ooc/data/MetaPartitionManager.java   | 33 ++++++++++++++++++++
 .../apache/giraph/worker/WorkerProgress.java    | 22 +++++++++++++
 .../giraph/worker/WorkerProgressStats.java      |  3 ++
 9 files changed, 122 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
index 0cb8486..7e22d48 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
@@ -62,6 +62,9 @@ public class GiraphStats extends HadoopCountersBase {
   /** aggregate bytes stored to local disks in out-of-core */
   public static final String OOC_BYTES_STORED_NAME =
       "Aggregate bytes stored to local disks (out-of-core)";
+  /** lowest percentage of graph in memory throughout the execution */
+  public static final String LOWEST_GRAPH_PERCENTAGE_IN_MEMORY_NAME =
+      "Lowest percentage of graph in memory so far (out-of-core)";
 
   /** Singleton instance for everyone to use */
   private static GiraphStats INSTANCE;
@@ -92,8 +95,10 @@ public class GiraphStats extends HadoopCountersBase {
   private static final int OOC_BYTES_LOADED = 11;
   /** Aggregate OOC stored bytes counter */
   private static final int OOC_BYTES_STORED = 12;
+  /** Lowest percentage of graph in memory over time */
+  private static final int LOWEST_GRAPH_PERCENTAGE_IN_MEMORY = 13;
   /** Number of counters in this class */
-  private static final int NUM_COUNTERS = 13;
+  private static final int NUM_COUNTERS = 14;
 
   /** All the counters stored */
   private final GiraphHadoopCounter[] counters;
@@ -123,6 +128,9 @@ public class GiraphStats extends HadoopCountersBase {
         getCounter(AGGREGATE_SENT_MESSAGE_BYTES_NAME);
     counters[OOC_BYTES_LOADED] = getCounter(OOC_BYTES_LOADED_NAME);
     counters[OOC_BYTES_STORED] = getCounter(OOC_BYTES_STORED_NAME);
+    counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY] =
+        getCounter(LOWEST_GRAPH_PERCENTAGE_IN_MEMORY_NAME);
+    counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY].setValue(100);
   }
 
   /**
@@ -260,6 +268,10 @@ public class GiraphStats extends HadoopCountersBase {
     return counters[OOC_BYTES_STORED];
   }
 
+  public GiraphHadoopCounter getLowestGraphPercentageInMemory() {
+    return counters[LOWEST_GRAPH_PERCENTAGE_IN_MEMORY];
+  }
+
   @Override
   public Iterator<GiraphHadoopCounter> iterator() {
     return Arrays.asList(counters).iterator();

http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
index dab3c2f..5636260 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
@@ -46,6 +46,8 @@ public class GlobalStats implements Writable {
   private long oocStoreBytesCount = 0;
   /** Bytes of data loaded to disk in the last superstep */
   private long oocLoadBytesCount = 0;
+  /** Lowest percentage of graph in memory throughout the execution */
+  private int lowestGraphPercentageInMemory = 100;
   /**
    * Master's decision on whether we should checkpoint and
    * what to do next.
@@ -108,6 +110,15 @@ public class GlobalStats implements Writable {
     this.checkpointStatus = checkpointStatus;
   }
 
+  public int getLowestGraphPercentageInMemory() {
+    return lowestGraphPercentageInMemory;
+  }
+
+  public void setLowestGraphPercentageInMemory(
+      int lowestGraphPercentageInMemory) {
+    this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory;
+  }
+
   /**
    * Add bytes loaded to the global stats.
    *
@@ -151,6 +162,9 @@ public class GlobalStats implements Writable {
     edgeCount = input.readLong();
     messageCount = input.readLong();
     messageBytesCount = input.readLong();
+    oocLoadBytesCount = input.readLong();
+    oocStoreBytesCount = input.readLong();
+    lowestGraphPercentageInMemory = input.readInt();
     haltComputation = input.readBoolean();
     if (input.readBoolean()) {
       checkpointStatus = CheckpointStatus.values()[input.readInt()];
@@ -166,6 +180,9 @@ public class GlobalStats implements Writable {
     output.writeLong(edgeCount);
     output.writeLong(messageCount);
     output.writeLong(messageBytesCount);
+    output.writeLong(oocLoadBytesCount);
+    output.writeLong(oocStoreBytesCount);
+    output.writeInt(lowestGraphPercentageInMemory);
     output.writeBoolean(haltComputation);
     output.writeBoolean(checkpointStatus != null);
     if (checkpointStatus != null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
index e931a99..e265163 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
@@ -64,6 +64,13 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
   private int workerWithMinFreeMemory;
   /** Minimum fraction of free memory on a worker */
   private double minFreeMemoryFraction = Double.MAX_VALUE;
+  /**
+   * Minimum percentage of graph in memory in any worker so far in the
+   * computation
+   */
+  private int minGraphPercentageInMemory = 100;
+  /** Id of the worker with min percentage of graph in memory */
+  private int workerWithMinGraphPercentageInMemory = -1;
 
   /**
    * Constructor
@@ -116,6 +123,11 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
       minFreeMemoryFraction = Math.min(minFreeMemoryFraction,
           workerProgress.getFreeMemoryFraction());
       freeMemoryMB += workerProgress.getFreeMemoryMB();
+      int percentage = workerProgress.getLowestGraphPercentageInMemory();
+      if (percentage < minGraphPercentageInMemory) {
+        minGraphPercentageInMemory = percentage;
+        workerWithMinGraphPercentageInMemory = workerProgress.getTaskId();
+      }
     }
     if (!Iterables.isEmpty(workerProgresses)) {
       freeMemoryMB /= Iterables.size(workerProgresses);
@@ -164,6 +176,12 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
     if (minFreeMemoryFraction < normalFreeMemoryFraction) {
       sb.append(", ******* YOUR JOB IS RUNNING LOW ON MEMORY *******");
     }
+    if (minGraphPercentageInMemory < 100) {
+      sb.append(" Spilling ")
+          .append(100 - minGraphPercentageInMemory)
+          .append("% of data to external storage on worker ")
+          .append(workerWithMinGraphPercentageInMemory);
+    }
     return sb.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 8372bd3..605e818 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -946,6 +946,12 @@ public class BspServiceMaster<I extends WritableComparable,
               workerMetrics.getBytesLoadedFromDisk());
           globalStats.addOocStoreBytesCount(
               workerMetrics.getBytesStoredOnDisk());
+          // Find the lowest percentage of graph in memory across all workers
+          // for one superstep
+          globalStats.setLowestGraphPercentageInMemory(
+              Math.min(globalStats.getLowestGraphPercentageInMemory(),
+                  (int) Math.round(
+                      workerMetrics.getGraphPercentageInMemory())));
           aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
         }
       } catch (JSONException e) {
@@ -2058,5 +2064,10 @@ public class BspServiceMaster<I extends WritableComparable,
       .increment(globalStats.getOocLoadBytesCount());
     gs.getAggregateOOCBytesStored()
       .increment(globalStats.getOocStoreBytesCount());
+    // Updating the lowest percentage of graph in memory throughout the
+    // execution across all the supersteps
+    int percentage = (int) gs.getLowestGraphPercentageInMemory().getValue();
+    gs.getLowestGraphPercentageInMemory().setValue(
+        Math.min(percentage, globalStats.getLowestGraphPercentageInMemory()));
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
index 219bcbd..e4281d9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
@@ -68,7 +68,7 @@ public class WorkerSuperstepMetrics implements Writable {
     superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS);
     bytesLoadedFromDisk = 0;
     bytesStoredOnDisk = 0;
-    graphPercentageInMemory = 0;
+    graphPercentageInMemory = 100;
   }
 
   /**
@@ -93,8 +93,6 @@ public class WorkerSuperstepMetrics implements Writable {
         registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
     if (gauge != null) {
       graphPercentageInMemory = gauge.value();
-    } else {
-      graphPercentageInMemory = 100;
     }
     return this;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index d4c2de5..2037abe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.ooc;
 
 import com.sun.management.GarbageCollectionNotificationInfo;
-import com.yammer.metrics.util.PercentGauge;
+import com.yammer.metrics.core.Gauge;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
@@ -485,15 +485,10 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
 
   @Override
   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
-    superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new PercentGauge() {
+    superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
       @Override
-      protected double getNumerator() {
-        return metaPartitionManager.getNumInMemoryPartitions();
-      }
-
-      @Override
-      protected double getDenominator() {
-        return metaPartitionManager.getNumPartitions();
+      public Double value() {
+        return metaPartitionManager.getLowestGraphFractionInMemory() * 100;
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 784d578..1332a3a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -20,9 +20,11 @@ package org.apache.giraph.ooc.data;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.giraph.worker.WorkerProgress;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
@@ -87,6 +89,16 @@ public class MetaPartitionManager {
    * processing
    */
   private final Random randomGenerator;
+  /**
+   * What is the lowest fraction of partitions in memory, relative to the total
+   * number of available partitions? This is an indirect estimation of the
+   * amount of graph in memory, which can be used to estimate how many more
+   * machines needed to avoid out-of-core execution. At the beginning all the
+   * graph is in memory, so the fraction is 1. This fraction is calculated per
+   * superstep.
+   */
+  private final AtomicDouble lowestGraphFractionInMemory =
+      new AtomicDouble(1);
 
   /**
    * Constructor
@@ -125,6 +137,24 @@ public class MetaPartitionManager {
     return partitions.size();
   }
 
+  public double getLowestGraphFractionInMemory() {
+    return lowestGraphFractionInMemory.get();
+  }
+
+  /**
+   * Update the lowest fraction of graph in memory so to have a more accurate
+   * information in one of the counters.
+   */
+  private synchronized void updateGraphFractionInMemory() {
+    double graphInMemory =
+        (double) getNumInMemoryPartitions() / getNumPartitions();
+    if (graphInMemory < lowestGraphFractionInMemory.get()) {
+      lowestGraphFractionInMemory.set(graphInMemory);
+      WorkerProgress.get().updateLowestGraphPercentageInMemory(
+          (int) (graphInMemory * 100));
+    }
+  }
+
   /**
    * Whether a given partition is available
    *
@@ -592,6 +622,7 @@ public class MetaPartitionManager {
    */
   public void doneOffloadingPartition(int partitionId) {
     numInMemoryPartitions.getAndDecrement();
+    updateGraphFractionInMemory();
     MetaPartition meta = partitions.get(partitionId);
     int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
     synchronized (meta) {
@@ -618,6 +649,8 @@ public class MetaPartitionManager {
       dictionary.reset();
     }
     numPartitionsProcessed.set(0);
+    lowestGraphFractionInMemory.set((double) getNumInMemoryPartitions() /
+        getNumPartitions());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
index eb543cd..4065869 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
@@ -181,6 +181,17 @@ public final class WorkerProgress extends WorkerProgressStats {
     freeMemoryFraction = MemoryUtils.freeMemoryFraction();
   }
 
+  /**
+   * Update lowest percentage of graph which stayed in memory so far in the
+   * execution
+   *
+   * @param fraction the fraction of graph in memory so far in this superstep
+   */
+  public synchronized void updateLowestGraphPercentageInMemory(int fraction) {
+    lowestGraphPercentageInMemory =
+        Math.min(lowestGraphPercentageInMemory, fraction);
+  }
+
   @ThriftField(1)
   public synchronized long getCurrentSuperstep() {
     return currentSuperstep;
@@ -281,6 +292,11 @@ public final class WorkerProgress extends WorkerProgressStats {
     return freeMemoryFraction;
   }
 
+  @ThriftField(21)
+  public synchronized int getLowestGraphPercentageInMemory() {
+    return lowestGraphPercentageInMemory;
+  }
+
   public synchronized boolean isInputSuperstep() {
     return currentSuperstep == -1;
   }
@@ -392,4 +408,10 @@ public final class WorkerProgress extends WorkerProgressStats {
   public synchronized void setTaskId(int taskId) {
     this.taskId = taskId;
   }
+
+  @ThriftField
+  public synchronized void setLowestGraphPercentageInMemory(
+      int lowestGraphPercentageInMemory) {
+    this.lowestGraphPercentageInMemory = lowestGraphPercentageInMemory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d827c97f/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
index 04ed2ea..583b073 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressStats.java
@@ -72,6 +72,9 @@ public class WorkerProgressStats {
   /** Fraction of memory that's free */
   protected double freeMemoryFraction;
 
+  /** Lowest percentage of graph in memory throughout the execution so far */
+  protected int lowestGraphPercentageInMemory = 100;
+
   public boolean isInputSuperstep() {
     return currentSuperstep == -1;
   }