You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/17 05:26:11 UTC

[GitHub] asfgit closed pull request #6409: [FLINK-9899][Kinesis Connector] Add comprehensive per-shard metrics to ShardConsumer

asfgit closed pull request #6409: [FLINK-9899][Kinesis Connector] Add comprehensive per-shard metrics to ShardConsumer
URL: https://github.com/apache/flink/pull/6409
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 05427379ada..ce9deab637d 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1393,6 +1393,71 @@ Thus, in order to infer the metric identifier:
       </td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>sleepTimeMillis</td>
+      <td>stream, shardId</td>
+      <td>The number of milliseconds the consumer spends sleeping before fetching records from Kinesis.
+      A particular shard's metric can be specified by stream name and shard id.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>maxNumberOfRecordsPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The maximum number of records requested by the consumer in a single getRecords call to Kinesis. If ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS
+      is set to true, this value is adaptively calculated to maximize the 2 Mbps read limits from Kinesis.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>numberOfAggregatedRecordsPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The number of aggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>numberOfDeggregatedRecordsPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The number of deaggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>averageRecordSizeBytes</td>
+      <td>stream, shardId</td>
+      <td>The average size of a Kinesis record in bytes, fetched by the consumer in a single getRecords call.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>runLoopTimeNanos</td>
+      <td>stream, shardId</td>
+      <td>The actual time taken, in nanoseconds, by the consumer in the run loop.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>loopFrequencyHz</td>
+      <td>stream, shardId</td>
+      <td> The number of calls to getRecords in one second. 
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>bytesRequestedPerFetch</td>
+      <td>stream, shardId</td>
+      <td> The bytes requested (2 Mbps / loopFrequencyHz) in a single call to getRecords.
+      <td>Gauge</td>
+    </tr>
   </tbody>
 </table>
 
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 13de0324ccf..0981b76ce89 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -630,7 +630,14 @@ private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup
 				shardState.getStreamShardHandle().getShard().getShardId());
 
 		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
-
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis);
 		return shardMetrics;
 	}
 
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index d698ecf2e9d..c4a3f275bd0 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -212,12 +212,16 @@ public void run() {
 					// we can close this consumer thread once we've reached the end of the subscribed shard
 					break;
 				} else {
-
+					shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
 					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
 
+					List<Record> aggregatedRecords = getRecordsResult.getRecords();
+					int numberOfAggregatedRecords = aggregatedRecords.size();
+					shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
+
 					// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
 					List<UserRecord> fetchedRecords = deaggregateRecords(
-						getRecordsResult.getRecords(),
+						aggregatedRecords,
 						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
 						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
@@ -227,11 +231,15 @@ public void run() {
 						deserializeRecordForCollectionAndUpdateState(record);
 					}
 
+					int numberOfDeaggregatedRecords = fetchedRecords.size();
+					shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
+
 					nextShardItr = getRecordsResult.getNextShardIterator();
 
 					long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
 					long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
 					maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
+					shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
 					processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
 				}
 			}
@@ -256,6 +264,7 @@ protected long adjustRunLoopFrequency(long processingStartTimeNanos, long proces
 			if (sleepTimeMillis > 0) {
 				Thread.sleep(sleepTimeMillis);
 				endTimeNanos = System.nanoTime();
+				shardMetricsReporter.setSleepTimeMillis(sleepTimeMillis);
 			}
 		}
 		return endTimeNanos;
@@ -280,6 +289,11 @@ private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recor
 			maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
 			// Ensure the value is not more than 10000L
 			maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+
+			// Set metrics
+			shardMetricsReporter.setAverageRecordSizeBytes(averageRecordSizeBytes);
+			shardMetricsReporter.setLoopFrequencyHz(loopFrequencyHz);
+			shardMetricsReporter.setBytesPerRead(bytesPerRead);
 		}
 		return maxNumberOfRecordsPerFetch;
 	}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
index 1b83f161d00..e850d25b6c6 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
@@ -34,4 +34,13 @@
 	public static final String SHARD_METRICS_GROUP = "shardId";
 
 	public static final String MILLIS_BEHIND_LATEST_GAUGE = "millisBehindLatest";
+	public static final String SLEEP_TIME_MILLIS = "sleepTimeMillis";
+	public static final String MAX_RECORDS_PER_FETCH = "maxNumberOfRecordsPerFetch";
+	public static final String NUM_AGGREGATED_RECORDS_PER_FETCH = "numberOfAggregatedRecordsPerFetch";
+	public static final String NUM_DEAGGREGATED_RECORDS_PER_FETCH = "numberOfDeaggregatedRecordsPerFetch";
+	public static final String AVG_RECORD_SIZE_BYTES = "averageRecordSizeBytes";
+	public static final String RUNTIME_LOOP_NANOS = "runLoopTimeNanos";
+	public static final String LOOP_FREQUENCY_HZ = "loopFrequencyHz";
+	public static final String BYTES_PER_READ = "bytesRequestedPerFetch";
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
index 2b6a491d247..4a27b9cdcb4 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
@@ -28,6 +28,14 @@
 public class ShardMetricsReporter {
 
 	private volatile long millisBehindLatest = -1;
+	private volatile double loopFrequencyHz = 0.0;
+	private volatile double bytesPerRead = 0.0;
+	private volatile long runLoopTimeNanos = 0L;
+	private volatile long averageRecordSizeBytes = 0L;
+	private volatile long sleepTimeMillis = 0L;
+	private volatile int numberOfAggregatedRecords = 0;
+	private volatile int numberOfDeaggregatedRecords = 0;
+	private volatile int maxNumberOfRecordsPerFetch = 0;
 
 	public long getMillisBehindLatest() {
 		return millisBehindLatest;
@@ -37,4 +45,68 @@ public void setMillisBehindLatest(long millisBehindLatest) {
 		this.millisBehindLatest = millisBehindLatest;
 	}
 
+	public double getLoopFrequencyHz() {
+		return loopFrequencyHz;
+	}
+
+	public void setLoopFrequencyHz(double loopFrequencyHz) {
+		this.loopFrequencyHz = loopFrequencyHz;
+	}
+
+	public double getBytesPerRead() {
+		return bytesPerRead;
+	}
+
+	public void setBytesPerRead(double bytesPerRead) {
+		this.bytesPerRead = bytesPerRead;
+	}
+
+	public long getRunLoopTimeNanos() {
+		return runLoopTimeNanos;
+	}
+
+	public void setRunLoopTimeNanos(long runLoopTimeNanos) {
+		this.runLoopTimeNanos = runLoopTimeNanos;
+	}
+
+	public long getAverageRecordSizeBytes() {
+		return averageRecordSizeBytes;
+	}
+
+	public void setAverageRecordSizeBytes(long averageRecordSizeBytes) {
+		this.averageRecordSizeBytes = averageRecordSizeBytes;
+	}
+
+	public long getSleepTimeMillis() {
+		return sleepTimeMillis;
+	}
+
+	public void setSleepTimeMillis(long sleepTimeMillis) {
+		this.sleepTimeMillis = sleepTimeMillis;
+	}
+
+	public int getNumberOfAggregatedRecords() {
+		return numberOfAggregatedRecords;
+	}
+
+	public void setNumberOfAggregatedRecords(int numberOfAggregatedRecords) {
+		this.numberOfAggregatedRecords = numberOfAggregatedRecords;
+	}
+
+	public int getNumberOfDeaggregatedRecords() {
+		return numberOfDeaggregatedRecords;
+	}
+
+	public void setNumberOfDeaggregatedRecords(int numberOfDeaggregatedRecords) {
+		this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords;
+	}
+
+	public int getMaxNumberOfRecordsPerFetch() {
+		return maxNumberOfRecordsPerFetch;
+	}
+
+	public void setMaxNumberOfRecordsPerFetch(int maxNumberOfRecordsPerFetch) {
+		this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+	}
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services