You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:19:58 UTC
[26/51] [abbrv] flink git commit: [FLINK-2415] [monitoring api] Add
vertex details request handler, unify IDs between vertices and plan
http://git-wip-us.apache.org/repos/asf/flink/blob/b26ce5a3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 999ca1b..2d6af6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,12 +18,15 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.io.StrictlyLocalAssignment;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -490,6 +493,45 @@ public class ExecutionJobVertex implements Serializable {
}
}
+ // --------------------------------------------------------------------------------------------
+ // Accumulators / Metrics
+ // --------------------------------------------------------------------------------------------
+
+ public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() {
+ // some specialized code to speed things up
+ long bytesRead = 0;
+ long bytesWritten = 0;
+ long recordsRead = 0;
+ long recordsWritten = 0;
+
+ for (ExecutionVertex v : getTaskVertices()) {
+ Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = v.getCurrentExecutionAttempt().getFlinkAccumulators();
+
+ if (metrics != null) {
+ LongCounter br = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
+ LongCounter bw = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
+ LongCounter rr = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
+ LongCounter rw = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
+
+ bytesRead += br != null ? br.getLocalValuePrimitive() : 0;
+ bytesWritten += bw != null ? bw.getLocalValuePrimitive() : 0;
+ recordsRead += rr != null ? rr.getLocalValuePrimitive() : 0;
+ recordsWritten += rw != null ? rw.getLocalValuePrimitive() : 0;
+ }
+ }
+
+ HashMap<AccumulatorRegistry.Metric, Accumulator<?, ?>> agg = new HashMap<>();
+ agg.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, new LongCounter(bytesRead));
+ agg.put(AccumulatorRegistry.Metric.NUM_BYTES_OUT, new LongCounter(bytesWritten));
+ agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_IN, new LongCounter(recordsRead));
+ agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_OUT, new LongCounter(recordsWritten));
+ return agg;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Static / pre-assigned input splits
+ // --------------------------------------------------------------------------------------------
+
private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[] splits) throws JobException {
final int numSubTasks = getParallelism();
@@ -593,10 +635,10 @@ public class ExecutionJobVertex implements Serializable {
return subTaskSplitAssignment;
}
- //---------------------------------------------------------------------------------------------
- // Predetermined InputSplitAssigner
- //---------------------------------------------------------------------------------------------
+ /**
+ * An InputSplitAssigner that assigns to pre-determined hosts.
+ */
public static class PredeterminedInputSplitAssigner implements InputSplitAssigner {
private List<LocatableInputSplit>[] inputSplitsPerSubtask;
@@ -604,7 +646,7 @@ public class ExecutionJobVertex implements Serializable {
@SuppressWarnings("unchecked")
public PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] inputSplitsPerSubtask) {
// copy input split assignment
- this.inputSplitsPerSubtask = (List<LocatableInputSplit>[]) new List[inputSplitsPerSubtask.length];
+ this.inputSplitsPerSubtask = (List<LocatableInputSplit>[]) new List<?>[inputSplitsPerSubtask.length];
for (int i = 0; i < inputSplitsPerSubtask.length; i++) {
List<LocatableInputSplit> next = inputSplitsPerSubtask[i];
http://git-wip-us.apache.org/repos/asf/flink/blob/b26ce5a3/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 2e50ca4..41ad2dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -59,4 +59,26 @@ public class WebMonitorUtils {
started, finished, status, lastChanged,
countsPerStatus, numTotalTasks);
}
+
+ public static void aggregateExecutionStateTimestamps(long[] timestamps, long[] other) {
+ timestamps[CREATED_POS] = Math.min(timestamps[CREATED_POS], other[CREATED_POS]);
+ timestamps[SCHEDULED_POS] = Math.min(timestamps[SCHEDULED_POS], other[SCHEDULED_POS]);
+ timestamps[DEPLOYING_POS] = Math.min(timestamps[DEPLOYING_POS], other[DEPLOYING_POS]);
+ timestamps[RUNNING_POS] = Math.min(timestamps[RUNNING_POS], other[RUNNING_POS]);
+ timestamps[FINISHED_POS] = Math.max(timestamps[FINISHED_POS], other[FINISHED_POS]);
+ timestamps[CANCELING_POS] = Math.min(timestamps[CANCELING_POS], other[CANCELING_POS]);
+ timestamps[CANCELED_POS] = Math.max(timestamps[CANCELED_POS], other[CANCELED_POS]);
+ timestamps[FAILED_POS] = Math.min(timestamps[FAILED_POS], other[FAILED_POS]);
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final int CREATED_POS = ExecutionState.CREATED.ordinal();
+ private static final int SCHEDULED_POS = ExecutionState.SCHEDULED.ordinal();
+ private static final int DEPLOYING_POS = ExecutionState.DEPLOYING.ordinal();
+ private static final int RUNNING_POS = ExecutionState.RUNNING.ordinal();
+ private static final int FINISHED_POS = ExecutionState.FINISHED.ordinal();
+ private static final int CANCELING_POS = ExecutionState.CANCELING.ordinal();
+ private static final int CANCELED_POS = ExecutionState.CANCELED.ordinal();
+ private static final int FAILED_POS = ExecutionState.FAILED.ordinal();
}