You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:19:58 UTC

[26/51] [abbrv] flink git commit: [FLINK-2415] [monitoring api] Add vertex details request handler, unify IDs between vertices and plan

http://git-wip-us.apache.org/repos/asf/flink/blob/b26ce5a3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 999ca1b..2d6af6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -490,6 +493,45 @@ public class ExecutionJobVertex implements Serializable {
 		}
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Accumulators / Metrics
+	// --------------------------------------------------------------------------------------------
+	
+	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() {
+		// some specialized code to speed things up
+		long bytesRead = 0;
+		long bytesWritten = 0;
+		long recordsRead = 0;
+		long recordsWritten = 0;
+		
+		for (ExecutionVertex v : getTaskVertices()) {
+			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = v.getCurrentExecutionAttempt().getFlinkAccumulators();
+			
+			if (metrics != null) {
+				LongCounter br = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
+				LongCounter bw = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
+				LongCounter rr = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
+				LongCounter rw = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
+				
+				bytesRead += br != null ? br.getLocalValuePrimitive() : 0;
+				bytesWritten += bw != null ? bw.getLocalValuePrimitive() : 0;
+				recordsRead += rr != null ? rr.getLocalValuePrimitive() : 0;
+				recordsWritten += rw != null ? rw.getLocalValuePrimitive() : 0;
+			}
+		}
+
+		HashMap<AccumulatorRegistry.Metric, Accumulator<?, ?>> agg = new HashMap<>();
+		agg.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, new LongCounter(bytesRead));
+		agg.put(AccumulatorRegistry.Metric.NUM_BYTES_OUT, new LongCounter(bytesWritten));
+		agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_IN, new LongCounter(recordsRead));
+		agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_OUT, new LongCounter(recordsWritten));
+		return agg;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Static / pre-assigned input splits
+	// --------------------------------------------------------------------------------------------
+
 	private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[] splits) throws JobException {
 		
 		final int numSubTasks = getParallelism();
@@ -593,10 +635,10 @@ public class ExecutionJobVertex implements Serializable {
 		return subTaskSplitAssignment;
 	}
 	
-	//---------------------------------------------------------------------------------------------
-	//  Predetermined InputSplitAssigner
-	//---------------------------------------------------------------------------------------------
 
+	/**
+	 * An InputSplitAssigner that assigns to pre-determined hosts.
+	 */
 	public static class PredeterminedInputSplitAssigner implements InputSplitAssigner {
 
 		private List<LocatableInputSplit>[] inputSplitsPerSubtask;
@@ -604,7 +646,7 @@ public class ExecutionJobVertex implements Serializable {
 		@SuppressWarnings("unchecked")
 		public PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] inputSplitsPerSubtask) {
 			// copy input split assignment
-			this.inputSplitsPerSubtask = (List<LocatableInputSplit>[]) new List[inputSplitsPerSubtask.length];
+			this.inputSplitsPerSubtask = (List<LocatableInputSplit>[]) new List<?>[inputSplitsPerSubtask.length];
 			for (int i = 0; i < inputSplitsPerSubtask.length; i++) {
 				List<LocatableInputSplit> next = inputSplitsPerSubtask[i];
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/b26ce5a3/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 2e50ca4..41ad2dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -59,4 +59,26 @@ public class WebMonitorUtils {
 				started, finished, status, lastChanged,  
 				countsPerStatus, numTotalTasks);
 	}
+	
+	public static void aggregateExecutionStateTimestamps(long[] timestamps, long[] other) {
+		timestamps[CREATED_POS] = Math.min(timestamps[CREATED_POS], other[CREATED_POS]);
+		timestamps[SCHEDULED_POS] = Math.min(timestamps[SCHEDULED_POS], other[SCHEDULED_POS]);
+		timestamps[DEPLOYING_POS] = Math.min(timestamps[DEPLOYING_POS], other[DEPLOYING_POS]);
+		timestamps[RUNNING_POS] = Math.min(timestamps[RUNNING_POS], other[RUNNING_POS]);
+		timestamps[FINISHED_POS] = Math.max(timestamps[FINISHED_POS], other[FINISHED_POS]);
+		timestamps[CANCELING_POS] = Math.min(timestamps[CANCELING_POS], other[CANCELING_POS]);
+		timestamps[CANCELED_POS] = Math.max(timestamps[CANCELED_POS], other[CANCELED_POS]);
+		timestamps[FAILED_POS] = Math.min(timestamps[FAILED_POS], other[FAILED_POS]);
+	}
+	
+	// ------------------------------------------------------------------------
+
+	private static final int CREATED_POS = ExecutionState.CREATED.ordinal();
+	private static final int SCHEDULED_POS = ExecutionState.SCHEDULED.ordinal();
+	private static final int DEPLOYING_POS = ExecutionState.DEPLOYING.ordinal();
+	private static final int RUNNING_POS = ExecutionState.RUNNING.ordinal();
+	private static final int FINISHED_POS = ExecutionState.FINISHED.ordinal();
+	private static final int CANCELING_POS = ExecutionState.CANCELING.ordinal();
+	private static final int CANCELED_POS = ExecutionState.CANCELED.ordinal();
+	private static final int FAILED_POS = ExecutionState.FAILED.ordinal();
 }