You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/12 13:31:02 UTC

[10/19] flink git commit: [FLINK-8162] [kinesis] Move shard metric gauges registration to KinesisDataFetcher

[FLINK-8162] [kinesis] Move shard metric gauges registration to KinesisDataFetcher

This commit refactors the registration of shard metric gauges to the
KinesisDataFetcher, instead of being handled by the ShardConsumer.
Overall, this achieves better separation of concerns.

This commit also consolidates all metrics related constant strings to a
separate KinesisConsumerMetricConstants class, with comments that the
metric names should not be touched to maintain backwards compatibility
for the consumer's shipped metrics.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03841fde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03841fde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03841fde

Branch: refs/heads/master
Commit: 03841fdece53f0b2264c8a46ae860e7689cabb49
Parents: 4d0d7f9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jan 12 14:11:51 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 12 19:43:28 2018 +0800

----------------------------------------------------------------------
 .../kinesis/internals/KinesisDataFetcher.java   | 38 +++++++++++++++----
 .../kinesis/internals/ShardConsumer.java        | 19 +++++-----
 .../metrics/KinesisConsumerMetricConstants.java | 37 ++++++++++++++++++
 .../kinesis/metrics/ShardMetricsReporter.java   | 40 ++++++++++++++++++++
 .../kinesis/internals/ShardConsumerTest.java    |  8 ++--
 5 files changed, 121 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/03841fde/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index e8a264c..8fee60d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -92,6 +94,13 @@ public class KinesisDataFetcher<T> {
 	private final KinesisDeserializationSchema<T> deserializationSchema;
 
 	// ------------------------------------------------------------------------
+	//  Consumer metrics
+	// ------------------------------------------------------------------------
+
+	/** The metric group that all metrics should be registered to. */
+	private final MetricGroup consumerMetricGroup;
+
+	// ------------------------------------------------------------------------
 	//  Subtask-specific settings
 	// ------------------------------------------------------------------------
 
@@ -205,6 +214,9 @@ public class KinesisDataFetcher<T> {
 		this.deserializationSchema = checkNotNull(deserializationSchema);
 		this.kinesis = checkNotNull(kinesis);
 
+		this.consumerMetricGroup = runtimeContext.getMetricGroup()
+			.addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);
+
 		this.error = checkNotNull(error);
 		this.subscribedShardsState = checkNotNull(subscribedShardsState);
 		this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
@@ -274,7 +286,7 @@ public class KinesisDataFetcher<T> {
 						seededStateIndex,
 						subscribedShardsState.get(seededStateIndex).getStreamShardHandle(),
 						subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum(),
-						registerMetricGroupForShard(subscribedShardsState.get(seededStateIndex))));
+						registerShardMetrics(consumerMetricGroup, subscribedShardsState.get(seededStateIndex))));
 			}
 		}
 
@@ -321,7 +333,7 @@ public class KinesisDataFetcher<T> {
 						newStateIndex,
 						newShardState.getStreamShardHandle(),
 						newShardState.getLastProcessedSequenceNum(),
-						registerMetricGroupForShard(newShardState)));
+						registerShardMetrics(consumerMetricGroup, newShardState)));
 			}
 
 			// we also check if we are running here so that we won't start the discovery sleep
@@ -547,13 +559,23 @@ public class KinesisDataFetcher<T> {
 
 	/**
 	 * Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}.
+	 *
+	 * @return a {@link ShardMetricsReporter} that can be used to update metric values
 	 */
-	private MetricGroup registerMetricGroupForShard(KinesisStreamShardState shardState) {
-		return runtimeContext
-			.getMetricGroup()
-			.addGroup("Kinesis")
-			.addGroup("stream", shardState.getStreamShardHandle().getStreamName())
-			.addGroup("shardId", shardState.getStreamShardHandle().getShard().getShardId());
+	private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState shardState) {
+		ShardMetricsReporter shardMetrics = new ShardMetricsReporter();
+
+		MetricGroup streamShardMetricGroup = metricGroup
+			.addGroup(
+				KinesisConsumerMetricConstants.STREAM_METRICS_GROUP,
+				shardState.getStreamShardHandle().getStreamName())
+			.addGroup(
+				KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
+				shardState.getStreamShardHandle().getShard().getShardId());
+
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
+
+		return shardMetrics;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/03841fde/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 9f6a4cd..a18466c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
@@ -67,12 +67,12 @@ public class ShardConsumer<T> implements Runnable {
 	private final int maxNumberOfRecordsPerFetch;
 	private final long fetchIntervalMillis;
 
+	private final ShardMetricsReporter shardMetricsReporter;
+
 	private SequenceNumber lastSequenceNum;
 
 	private Date initTimestamp;
 
-	private long millisBehindLatest;
-
 	/**
 	 * Creates a shard consumer.
 	 *
@@ -80,19 +80,19 @@ public class ShardConsumer<T> implements Runnable {
 	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
 	 * @param subscribedShard the shard this consumer is subscribed to
 	 * @param lastSequenceNum the sequence number in the shard to start consuming
-	 * @param kinesisMetricGroup the metric group to report to
+	 * @param shardMetricsReporter the reporter to report metrics to
 	 */
 	public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
 						Integer subscribedShardStateIndex,
 						StreamShardHandle subscribedShard,
 						SequenceNumber lastSequenceNum,
-						MetricGroup kinesisMetricGroup) {
+						ShardMetricsReporter shardMetricsReporter) {
 		this(fetcherRef,
 			subscribedShardStateIndex,
 			subscribedShard,
 			lastSequenceNum,
 			KinesisProxy.create(fetcherRef.getConsumerConfiguration()),
-			kinesisMetricGroup);
+			shardMetricsReporter);
 	}
 
 	/** This constructor is exposed for testing purposes. */
@@ -101,14 +101,13 @@ public class ShardConsumer<T> implements Runnable {
 							StreamShardHandle subscribedShard,
 							SequenceNumber lastSequenceNum,
 							KinesisProxyInterface kinesis,
-							MetricGroup kinesisMetricGroup) {
+							ShardMetricsReporter shardMetricsReporter) {
 		this.fetcherRef = checkNotNull(fetcherRef);
 		this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
 		this.subscribedShard = checkNotNull(subscribedShard);
 		this.lastSequenceNum = checkNotNull(lastSequenceNum);
 
-		checkNotNull(kinesisMetricGroup);
-		kinesisMetricGroup.gauge("millisBehindLatest", () -> millisBehindLatest);
+		this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
 
 		checkArgument(
 			!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
@@ -304,7 +303,7 @@ public class ShardConsumer<T> implements Runnable {
 				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
 
 				// Update millis behind latest so it gets reported by the millisBehindLatest gauge
-				millisBehindLatest = getRecordsResult.getMillisBehindLatest();
+				shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
 			} catch (ExpiredIteratorException eiEx) {
 				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
 					" refreshing the iterator ...", shardItr, subscribedShard);

http://git-wip-us.apache.org/repos/asf/flink/blob/03841fde/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
new file mode 100644
index 0000000..1b83f16
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.metrics;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A collection of consumer metric related constant names.
+ *
+ * <p>The names must not be changed, as that would break backwards compatibility for the consumer metrics.
+ */
+@Internal
+public class KinesisConsumerMetricConstants {
+
+	public static final String KINESIS_CONSUMER_METRICS_GROUP = "KinesisConsumer";
+
+	public static final String STREAM_METRICS_GROUP = "stream";
+	public static final String SHARD_METRICS_GROUP = "shardId";
+
+	public static final String MILLIS_BEHIND_LATEST_GAUGE = "millisBehindLatest";
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03841fde/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
new file mode 100644
index 0000000..2b6a491
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
+
+/**
+ * A container for {@link ShardConsumer}s to report metric values.
+ */
+@Internal
+public class ShardMetricsReporter {
+
+	private volatile long millisBehindLatest = -1;
+
+	public long getMillisBehindLatest() {
+		return millisBehindLatest;
+	}
+
+	public void setMillisBehindLatest(long millisBehindLatest) {
+		this.millisBehindLatest = millisBehindLatest;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03841fde/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 6900183..e8c5902 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -78,7 +78,8 @@ public class ShardConsumerTest {
 			0,
 			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
-			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9), new UnregisteredMetricsGroup()).run();
+			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9),
+			new ShardMetricsReporter()).run();
 
 		assertTrue(fetcher.getNumOfElementsCollected() == 1000);
 		assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals(
@@ -119,7 +120,8 @@ public class ShardConsumerTest {
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
 			// Get a total of 1000 records with 9 getRecords() calls,
 			// and the 7th getRecords() call will encounter an unexpected expired shard iterator
-			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7), new UnregisteredMetricsGroup()).run();
+			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7),
+			new ShardMetricsReporter()).run();
 
 		assertTrue(fetcher.getNumOfElementsCollected() == 1000);
 		assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals(