You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/15 17:44:47 UTC

[flink] 01/02: [hotfix][kinesis] Eliminate compliler warnings and apply simple inspection-based automated cleanups

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3a2efb4472375169f39dc1000dd1ea0fff74fa19
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Nov 7 17:25:17 2019 +0100

    [hotfix][kinesis] Eliminate compliler warnings and apply simple inspection-based automated cleanups
---
 .../kinesis/internals/KinesisDataFetcher.java      | 32 ++++++++++------------
 .../connectors/kinesis/proxy/KinesisProxy.java     | 10 +------
 .../connectors/kinesis/util/KinesisConfigUtil.java |  7 ++---
 3 files changed, 19 insertions(+), 30 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 80b724b..afd6138 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -76,9 +76,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following:
  * <ul>
  *     <li>1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset
- *     		  of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be
- *     		  subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe
- *     		  to the same subset of shards even after restoring)</li>
+ *            of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be
+ *            subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe
+ *            to the same subset of shards even after restoring)</li>
  *     <li>2. decide where in each discovered shard should the fetcher start subscribing to</li>
  *     <li>3. subscribe to shards by creating a single thread for each shard</li>
  * </ul>
@@ -87,6 +87,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed
  * by multiple threads, these operations should only be done using the handler methods provided in this class.
  */
+@SuppressWarnings("unchecked")
 @Internal
 public class KinesisDataFetcher<T> {
 
@@ -195,8 +196,8 @@ public class KinesisDataFetcher<T> {
 
 	private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
 	private final WatermarkTracker watermarkTracker;
-	private final transient RecordEmitter recordEmitter;
-	private transient boolean isIdle;
+	private final RecordEmitter recordEmitter;
+	private boolean isIdle;
 
 	/**
 	 * The watermark related state for each shard consumer. Entries in this map will be created when shards
@@ -280,8 +281,8 @@ public class KinesisDataFetcher<T> {
 
 		@Override
 		public RecordQueue<RecordWrapper<T>> getQueue(int producerIndex) {
-			return queues.computeIfAbsent(producerIndex, (key) -> {
-				return new RecordQueue<RecordWrapper<T>>() {
+			return queues.computeIfAbsent(producerIndex, (key) ->
+				new RecordQueue<RecordWrapper<T>>() {
 					@Override
 					public void put(RecordWrapper<T> record) {
 						emit(record, this);
@@ -296,8 +297,6 @@ public class KinesisDataFetcher<T> {
 					public RecordWrapper<T> peek() {
 						return null;
 					}
-
-				};
 			});
 		}
 	}
@@ -621,6 +620,7 @@ public class KinesisDataFetcher<T> {
 	}
 
 	/** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown. */
+	@SuppressWarnings("StatementWithEmptyBody")
 	public void awaitTermination() throws InterruptedException {
 		while (!shardConsumersExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
 			// Keep waiting.
@@ -757,8 +757,6 @@ public class KinesisDataFetcher<T> {
 	 *
 	 * <p>Responsible for tracking per shard watermarks and emit timestamps extracted from
 	 * the record, when a watermark assigner was configured.
-	 *
-	 * @param rw
 	 */
 	private void emitRecordAndUpdateState(RecordWrapper<T> rw) {
 		synchronized (checkpointLock) {
@@ -952,22 +950,22 @@ public class KinesisDataFetcher<T> {
 	/** Timer task to update shared watermark state. */
 	private class WatermarkSyncCallback implements ProcessingTimeCallback {
 
+		private static final long LOG_INTERVAL_MILLIS = 60_000;
+
 		private final ProcessingTimeService timerService;
 		private final long interval;
-		private final MetricGroup shardMetricsGroup;
 		private long lastGlobalWatermark = Long.MIN_VALUE;
 		private long propagatedLocalWatermark = Long.MIN_VALUE;
-		private long logIntervalMillis = 60_000;
 		private int stalledWatermarkIntervalCount = 0;
 		private long lastLogged;
 
 		WatermarkSyncCallback(ProcessingTimeService timerService, long interval) {
 			this.timerService = checkNotNull(timerService);
 			this.interval = interval;
-			this.shardMetricsGroup = consumerMetricGroup.addGroup("subtaskId",
+			MetricGroup shardMetricsGroup = consumerMetricGroup.addGroup("subtaskId",
 				String.valueOf(indexOfThisConsumerSubtask));
-			this.shardMetricsGroup.gauge("localWatermark", () -> nextWatermark);
-			this.shardMetricsGroup.gauge("globalWatermark", () -> lastGlobalWatermark);
+			shardMetricsGroup.gauge("localWatermark", () -> nextWatermark);
+			shardMetricsGroup.gauge("globalWatermark", () -> lastGlobalWatermark);
 		}
 
 		public void start() {
@@ -987,7 +985,7 @@ public class KinesisDataFetcher<T> {
 					LOG.info("WatermarkSyncCallback subtask: {} is idle", indexOfThisConsumerSubtask);
 				}
 
-				if (timestamp - lastLogged > logIntervalMillis) {
+				if (timestamp - lastLogged > LOG_INTERVAL_MILLIS) {
 					lastLogged = System.currentTimeMillis();
 					LOG.info("WatermarkSyncCallback subtask: {} local watermark: {}"
 							+ ", global watermark: {}, delta: {} timeouts: {}, emitter: {}",
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 2c21f11..fe4ae32b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -52,7 +52,6 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -213,8 +212,6 @@ public class KinesisProxy implements KinesisProxyInterface {
 	/**
 	 * Create the Kinesis client, using the provided configuration properties and default {@link ClientConfiguration}.
 	 * Derived classes can override this method to customize the client configuration.
-	 * @param configProps
-	 * @return
 	 */
 	protected AmazonKinesis createKinesisClient(Properties configProps) {
 
@@ -483,12 +480,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 		// 	https://github.com/lyft/kinesalite/pull/4
 		if (startShardId != null && listShardsResults != null) {
 			List<Shard> shards = listShardsResults.getShards();
-			Iterator<Shard> shardItr = shards.iterator();
-			while (shardItr.hasNext()) {
-				if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
-					shardItr.remove();
-				}
-			}
+			shards.removeIf(shard -> StreamShardHandle.compareShardIds(shard.getShardId(), startShardId) <= 0);
 		}
 
 		return listShardsResults;
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index b277696..5493774 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -28,8 +28,6 @@ import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConsta
 
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -77,8 +75,6 @@ public class KinesisConfigUtil {
 	/** Default values for ThreadPoolSize. **/
 	protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
 
-	private static final Logger LOG = LoggerFactory.getLogger(KinesisConfigUtil.class);
-
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
 	 */
@@ -87,6 +83,7 @@ public class KinesisConfigUtil {
 
 		validateAwsConfiguration(config);
 
+		//noinspection SimplifiableBooleanExpression - the current logic expression is actually easier to understand
 		if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^ config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
 			// per validation in AwsClientBuilder
 			throw new IllegalArgumentException(String.format("For FlinkKinesisConsumer either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
@@ -177,6 +174,7 @@ public class KinesisConfigUtil {
 	 * Replace deprecated configuration properties for {@link FlinkKinesisProducer}.
 	 * This should be remove along with deprecated keys
 	 */
+	@SuppressWarnings("deprecation")
 	public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
 		// Replace deprecated key
 		if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
@@ -207,6 +205,7 @@ public class KinesisConfigUtil {
 	 * @param configProps original config properties.
 	 * @return backfilled config properties.
 	 */
+	@SuppressWarnings("UnusedReturnValue")
 	public static Properties backfillConsumerKeys(Properties configProps) {
 		HashMap<String, String> oldKeyToNewKeys = new HashMap<>();
 		oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);