You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by di...@apache.org on 2019/12/09 19:27:22 UTC
[giraph] branch trunk updated: GIRAPH-799
This is an automated email from the ASF dual-hosted git repository.
dionysios pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git
The following commit(s) were added to refs/heads/trunk by this push:
new 526f561 GIRAPH-799
526f561 is described below
commit 526f5619e6b115ad8db1af245fd4736125dd5c37
Author: Aanchal Dalmia <aa...@gmail.com>
AuthorDate: Mon Dec 9 11:26:54 2019 -0800
GIRAPH-799
closes #117
---
.../org/apache/giraph/bsp/CentralizedServiceMaster.java | 5 ++++-
.../java/org/apache/giraph/counters/CustomCounters.java | 10 ++++++++--
.../java/org/apache/giraph/counters/GiraphTimers.java | 17 ++++++++++-------
.../java/org/apache/giraph/master/BspServiceMaster.java | 8 ++++----
.../java/org/apache/giraph/master/MasterThread.java | 4 +++-
.../java/org/apache/giraph/worker/BspServiceWorker.java | 2 +-
6 files changed, 30 insertions(+), 16 deletions(-)
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 6f5d459..cbf44b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -183,7 +183,10 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
/**
* Add the Giraph Timers to thirft counter struct, and send to the job client
+ * Counters include the Giraph Timers for setup, initialise, shutdown, total,
+ * and time for the given superstep
+ * @param superstep superstep for which the GiraphTimer will be sent
*
*/
- void addGiraphTimersAndSendCounters();
+ void addGiraphTimersAndSendCounters(long superstep);
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java b/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java
index 482cbbc..4a4fb79 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java
@@ -19,6 +19,7 @@
package org.apache.giraph.counters;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -73,11 +74,16 @@ public class CustomCounters {
/**
* Get the unique counter group and names
+ * This will also clear the counters list, to avoid duplicate
+ * counters from the previous superstep from being sent to the
+ * zookeeper again
*
* @return Map of unique counter names
*/
- public static Set<CustomCounter> getCustomCounters() {
- return COUNTER_NAMES;
+ public static Set<CustomCounter> getAndClearCustomCounters() {
+ Set<CustomCounter> counterNamesCopy = new HashSet<>(COUNTER_NAMES);
+ COUNTER_NAMES.clear();
+ return counterNamesCopy;
}
/**
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
index f4005c4..33875e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
@@ -184,11 +184,13 @@ public class GiraphTimers extends HadoopCountersBase {
}
/**
- * Get a map of counter names and values
- *
+ * Get a map of counter names and values for the given superstep
+ * Counters include Setup, Initialise, Shutdown, Total, and time for
+ * the given superstep
+ * @param superstep superstep for which to fetch the GiraphTimer
* @return Map of counter names and values
*/
- public List<CustomCounter> getCounterList() {
+ public List<CustomCounter> getCounterList(long superstep) {
List<CustomCounter> countersList = new ArrayList<>();
for (GiraphHadoopCounter counter: jobCounters) {
CustomCounter customCounter = new CustomCounter(
@@ -196,11 +198,12 @@ public class GiraphTimers extends HadoopCountersBase {
CustomCounter.Aggregation.SUM, counter.getValue());
countersList.add(customCounter);
}
- for (Map.Entry<Long, GiraphHadoopCounter> entry :
- superstepMsec.entrySet()) {
+ GiraphHadoopCounter giraphHadoopCounter = superstepMsec.get(superstep);
+ if (giraphHadoopCounter != null) {
CustomCounter customCounter = new CustomCounter(
- GROUP_NAME, entry.getValue().getName(),
- CustomCounter.Aggregation.SUM, entry.getValue().getValue());
+ GROUP_NAME, giraphHadoopCounter.getName(),
+ CustomCounter.Aggregation.SUM,
+ giraphHadoopCounter.getValue());
countersList.add(customCounter);
}
return countersList;
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 153d4cc..87a5b0c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -1678,7 +1678,6 @@ public class BspServiceMaster<I extends WritableComparable,
// are no more messages in the system, stop the computation
GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
aggregateCountersFromWorkersAndMaster();
- addGiraphTimersAndSendCounters();
if (masterCompute.isHalted() ||
(globalStats.getFinishedVertexCount() ==
globalStats.getVertexCount() &&
@@ -1927,7 +1926,7 @@ public class BspServiceMaster<I extends WritableComparable,
// we should not add them again here.
Counter counter;
Set<CustomCounter> masterCounterNames =
- CustomCounters.getCustomCounters();
+ CustomCounters.getAndClearCustomCounters();
for (CustomCounter customCounter : masterCounterNames) {
String groupName = customCounter.getGroupName();
String counterName = customCounter.getCounterName();
@@ -1961,12 +1960,13 @@ public class BspServiceMaster<I extends WritableComparable,
* the time required for shutdown and cleanup
* This will fetch the final Giraph Timers, and send all the counters
* to the job client
+ * @param superstep superstep for which the GiraphTimer will be sent
*
*/
- public void addGiraphTimersAndSendCounters() {
+ public void addGiraphTimersAndSendCounters(long superstep) {
List<CustomCounter> giraphCounters =
giraphCountersThriftStruct.getCounters();
- giraphCounters.addAll(GiraphTimers.getInstance().getCounterList());
+ giraphCounters.addAll(GiraphTimers.getInstance().getCounterList(superstep));
giraphCountersThriftStruct.setCounters(giraphCounters);
getJobProgressTracker().sendMasterCounters(giraphCountersThriftStruct);
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index dc86ca3..7fccc34 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -147,6 +147,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep,
computationName).increment(superstepMillis);
}
+ bspServiceMaster.addGiraphTimersAndSendCounters(cachedSuperstep);
bspServiceMaster.postSuperstep();
@@ -191,7 +192,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
GiraphTimers.getInstance().getTotalMs().
increment(System.currentTimeMillis() - initializeMillis);
}
- bspServiceMaster.addGiraphTimersAndSendCounters();
+ bspServiceMaster.addGiraphTimersAndSendCounters(
+ bspServiceMaster.getSuperstep());
bspServiceMaster.postApplication();
// CHECKSTYLE: stop IllegalCatchCheck
} catch (Exception e) {
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 5cbc4e7..b6756c9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -1251,7 +1251,7 @@ else[HADOOP_NON_SECURE]*/
*/
public void storeCountersInZooKeeper(boolean allSuperstepsDone) {
Set<CustomCounter> additionalCounters =
- CustomCounters.getCustomCounters();
+ CustomCounters.getAndClearCustomCounters();
JSONArray jsonCounters = new JSONArray();
Mapper.Context context = getContext();