You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/16 19:36:02 UTC

[GitHub] zentol closed pull request #2753: [FLINK-4840] [metrics] Measure latency of record processing and expose it as a metric

zentol closed pull request #2753: [FLINK-4840] [metrics] Measure latency of record processing and expose it as a metric
URL: https://github.com/apache/flink/pull/2753
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 0a294fa65a1..8d86158065f 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -134,6 +134,12 @@ under the License.
 			<version>${jackson.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.zookeeper</groupId>
 			<artifactId>zookeeper</artifactId>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index c5296fbd063..5d06e2c2251 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -18,16 +18,21 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
+import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Snapshot;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -50,6 +55,8 @@
 	private final Meter numRecordsInRate;
 	private final Meter numRecordsOutRate;
 
+	private final LatencyHistogram recordProcessLatency;
+
 	public TaskIOMetricGroup(TaskMetricGroup parent) {
 		super(parent);
 
@@ -63,6 +70,10 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
 		this.numRecordsOut = counter("numRecordsOut", new SumCounter());
 		this.numRecordsInRate = meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60));
 		this.numRecordsOutRate = meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60));
+		this.recordProcessLatency = histogram("recordProcessLatency", new LatencyHistogram(true));
+		if (recordProcessLatency.getLatencyAccumulateCounter() != null) {
+			meter("recordProcTimeProportion", new MeterView(recordProcessLatency.getLatencyAccumulateCounter(), 60));
+		}
 	}
 
 	public IOMetrics createSnapshot() {
@@ -104,6 +115,10 @@ public Meter getNumBytesOutRateMeter() {
 		return numBytesOutRate;
 	}
 
+	public Histogram getRecordProcessLatency() {
+		return recordProcessLatency;
+	}
+
 	// ============================================================================================
 	// Buffer metrics
 	// ============================================================================================
@@ -257,4 +272,134 @@ public long getCount() {
 			return sum;
 		}
 	}
+
+	// ============================================================================================
+	// Latency metrics
+	// ============================================================================================
+
+	/**
+	 * Histogram measuring the record processing latency of a task.
+	 * It's element processing time of a task. But an element emitting time for Source Task.
+	 * It could be given a history size or a Reservoir when construct.
+	 * A latency accumulate will be activated if accumulate enabled
+	 */
+	private static class LatencyHistogram implements Histogram {
+
+		private static final int DEFAULT_HISTORY_SIZE = 128;
+
+		// conversion of millisecond and nanosecond
+		private static final double NANOSECONDS_PER_MILLISECOND = 1000000.0D;
+
+		// a reservoir for history data
+		private final Reservoir latencyReservoir;
+
+		// accumulate latency for measurement processing time per second
+		private final SimpleCounter latencyAccumulateCounter;
+
+		public LatencyHistogram() {
+			this(DEFAULT_HISTORY_SIZE);
+		}
+
+		public LatencyHistogram(int historySize) {
+			//default disable accumulate
+			this(historySize, false);
+		}
+
+		public LatencyHistogram(Reservoir latencyReservoir) {
+			//default disable accumulate
+			this(latencyReservoir, false);
+		}
+
+		public LatencyHistogram(boolean enableAccumulate) {
+			this(DEFAULT_HISTORY_SIZE, enableAccumulate);
+		}
+
+		public LatencyHistogram(int historySize, boolean enableAccumulate) {
+			//default with Sliding Window Reservoir
+			this(new SlidingWindowReservoir(historySize), enableAccumulate);
+		}
+
+		public LatencyHistogram(Reservoir latencyReservoir, boolean enableAccumulate) {
+			this.latencyReservoir = latencyReservoir;
+			if (enableAccumulate) {
+				latencyAccumulateCounter = new SimpleCounter();
+			} else {
+				latencyAccumulateCounter = null;
+			}
+		}
+
+		@Override
+		public void update(long nanosecond) {
+			latencyReservoir.update(nanosecond);
+			if (latencyAccumulateCounter != null) {
+				latencyAccumulateCounter.inc((long)(nanosecond / NANOSECONDS_PER_MILLISECOND));
+			}
+		}
+
+		@Override
+		public long getCount() {
+			return latencyReservoir.size();
+		}
+
+		@Override
+		public HistogramStatistics getStatistics() {
+			return new LatencyHistogramStatistics(latencyReservoir.getSnapshot());
+		}
+
+		public Counter getLatencyAccumulateCounter() {
+			return latencyAccumulateCounter;
+		}
+
+
+		private static class LatencyHistogramStatistics extends HistogramStatistics {
+
+			private final Snapshot latencySnapshot;
+
+			public LatencyHistogramStatistics(Snapshot latencySnapshot) {
+				this.latencySnapshot = latencySnapshot;
+			}
+
+			@Override
+			public double getQuantile(double quantile) {
+				return latencySnapshot.getValue(quantile) / NANOSECONDS_PER_MILLISECOND;
+			}
+
+			@Override
+			public long[] getValues() {
+				long [] nanos = latencySnapshot.getValues();
+				long [] millis = new long[nanos.length];
+
+				for (int i = 0; i < nanos.length; ++i) {
+					millis[i] = (long)(nanos[i] / NANOSECONDS_PER_MILLISECOND);
+				}
+
+				return millis;
+			}
+
+			@Override
+			public int size() {
+				return latencySnapshot.size();
+			}
+
+			@Override
+			public double getMean() {
+				return latencySnapshot.getMean() / NANOSECONDS_PER_MILLISECOND;
+			}
+
+			@Override
+			public double getStdDev() {
+				return latencySnapshot.getStdDev() / NANOSECONDS_PER_MILLISECOND;
+			}
+
+			@Override
+			public long getMax() {
+				return (long)(latencySnapshot.getMax() / NANOSECONDS_PER_MILLISECOND);
+			}
+
+			@Override
+			public long getMin() {
+				return (long)(latencySnapshot.getMin() / NANOSECONDS_PER_MILLISECOND);
+			}
+		}
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a659866bbfd..66b67aaa9b1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -188,6 +188,11 @@ public MetricGroup getMetricGroup() {
 		return metrics;
 	}
 
+	@Override
+	public Output<StreamRecord<OUT>> getOutput() {
+		return output;
+	}
+
 	@Override
 	public final void initializeState(OperatorStateHandles stateHandles) throws Exception {
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index f6e547290e0..59086a94e7e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -136,4 +136,6 @@ OperatorSnapshotResult snapshotState(
 	void setChainingStrategy(ChainingStrategy strategy);
 	
 	MetricGroup getMetricGroup();
+
+	Output<StreamRecord<OUT>> getOutput();
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 714317df9b7..60972aa6f76 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
@@ -81,6 +82,8 @@
 
 	private Counter numRecordsIn;
 
+	private Histogram recordProcessLatency;
+
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(
 			InputGate[] inputGates,
@@ -168,12 +171,15 @@ public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final
 						continue;
 					} else {
 						// now we can do the actual processing
+						long start=System.nanoTime();
 						StreamRecord<IN> record = recordOrMark.asRecord();
 						synchronized (lock) {
 							numRecordsIn.inc();
 							streamOperator.setKeyContextElement1(record);
 							streamOperator.processElement(record);
 						}
+						long end=System.nanoTime();
+						recordProcessLatency.update(end - start);
 						return true;
 					}
 				}
@@ -210,6 +216,8 @@ public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final
 	 * @param metrics metric group
 	 */
 	public void setMetricGroup(TaskIOMetricGroup metrics) {
+		recordProcessLatency = metrics.getRecordProcessLatency();
+
 		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
 			@Override
 			public Long getValue() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 5f7ffe45a34..85963ad448b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,8 +20,11 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -85,6 +88,10 @@
 	private final DeserializationDelegate<StreamElement> deserializationDelegate1;
 	private final DeserializationDelegate<StreamElement> deserializationDelegate2;
 
+	private Counter numRecordsIn;
+
+	private Histogram recordProcessLatency;
+
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	public StreamTwoInputProcessor(
 			Collection<InputGate> inputGates1,
@@ -149,6 +156,10 @@ public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
 			return false;
 		}
 
+		if (numRecordsIn == null) {
+			numRecordsIn = ((OperatorMetricGroup)streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+		}
+
 		while (true) {
 			if (currentRecordDeserializer != null) {
 				DeserializationResult result;
@@ -177,10 +188,14 @@ else if (recordOrWatermark.isLatencyMarker()) {
 							continue;
 						}
 						else {
+							long start=System.nanoTime();
 							synchronized (lock) {
+								numRecordsIn.inc();
 								streamOperator.setKeyContextElement1(recordOrWatermark.<IN1>asRecord());
 								streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
 							}
+							long end=System.nanoTime();
+							recordProcessLatency.update(end - start);
 							return true;
 
 						}
@@ -198,10 +213,14 @@ else if (recordOrWatermark.isLatencyMarker()) {
 							continue;
 						}
 						else {
+							long start=System.nanoTime();
 							synchronized (lock) {
+								numRecordsIn.inc();
 								streamOperator.setKeyContextElement2(recordOrWatermark.<IN2>asRecord());
 								streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
 							}
+							long end=System.nanoTime();
+							recordProcessLatency.update(end - start);
 							return true;
 						}
 					}
@@ -275,6 +294,8 @@ private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Water
 	 * @param metrics metric group
 	 */
 	public void setMetricGroup(TaskIOMetricGroup metrics) {
+		recordProcessLatency = metrics.getRecordProcessLatency();
+
 		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
 			@Override
 			public Long getValue() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 7ae99f6c036..ab5b866829d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -19,8 +19,14 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * Task for executing streaming sources.
@@ -40,24 +46,65 @@
 public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
 	extends StreamTask<OUT, OP> {
 
+	private Output<StreamRecord<OUT>> output;
+
 	@Override
 	protected void init() {
-		// does not hold any resources, so no initialization needed
+		output = new SourceOutput<>(getHeadOutput(), getEnvironment().getMetricGroup().getIOMetricGroup());
 	}
 
 	@Override
 	protected void cleanup() {
-		// does not hold any resources, so no cleanup needed
 	}
 	
 
 	@Override
 	protected void run() throws Exception {
-		headOperator.run(getCheckpointLock());
+		headOperator.run(getCheckpointLock(), output);
 	}
 	
 	@Override
 	protected void cancelTask() throws Exception {
 		headOperator.cancel();
 	}
+
+	/**
+	 * Special output for sources. for example measuring elements emit latency as a metric.
+	 *
+	 *  @param <OUT> Type of the output elements of this source.
+	 */
+	private static class SourceOutput<OUT> implements Output<StreamRecord<OUT>> {
+
+		private final Output<StreamRecord<OUT>> output;
+
+		private final Histogram recordProcessLatency;
+
+		public SourceOutput(Output<StreamRecord<OUT>> output, TaskIOMetricGroup ioMetricGroup) {
+			this.output = output;
+			this.recordProcessLatency = ioMetricGroup.getRecordProcessLatency();
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+			output.emitLatencyMarker(latencyMarker);
+		}
+
+		@Override
+		public void collect(StreamRecord<OUT> record) {
+			long start=System.nanoTime();
+			output.collect(record);
+			long end=System.nanoTime();
+			recordProcessLatency.update(end - start);
+		}
+
+		@Override
+		public void close() {
+			output.close();
+		}
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index fa7d1b09a50..c52c9d73b53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -486,7 +486,7 @@ public StreamConfig getConfiguration() {
 	}
 
 	Output<StreamRecord<OUT>> getHeadOutput() {
-		return operatorChain.getChainEntryPoint();
+		return headOperator.getOutput();
 	}
 
 	RecordWriterOutput<?>[] getStreamOutputs() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services