You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/15 17:44:47 UTC
[flink] 01/02: [hotfix][kinesis] Eliminate compliler warnings and
apply simple inspection-based automated cleanups
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3a2efb4472375169f39dc1000dd1ea0fff74fa19
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Nov 7 17:25:17 2019 +0100
[hotfix][kinesis] Eliminate compliler warnings and apply simple inspection-based automated cleanups
---
.../kinesis/internals/KinesisDataFetcher.java | 32 ++++++++++------------
.../connectors/kinesis/proxy/KinesisProxy.java | 10 +------
.../connectors/kinesis/util/KinesisConfigUtil.java | 7 ++---
3 files changed, 19 insertions(+), 30 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 80b724b..afd6138 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -76,9 +76,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following:
* <ul>
* <li>1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset
- * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be
- * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe
- * to the same subset of shards even after restoring)</li>
+ * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be
+ * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe
+ * to the same subset of shards even after restoring)</li>
* <li>2. decide where in each discovered shard should the fetcher start subscribing to</li>
* <li>3. subscribe to shards by creating a single thread for each shard</li>
* </ul>
@@ -87,6 +87,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed
* by multiple threads, these operations should only be done using the handler methods provided in this class.
*/
+@SuppressWarnings("unchecked")
@Internal
public class KinesisDataFetcher<T> {
@@ -195,8 +196,8 @@ public class KinesisDataFetcher<T> {
private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
private final WatermarkTracker watermarkTracker;
- private final transient RecordEmitter recordEmitter;
- private transient boolean isIdle;
+ private final RecordEmitter recordEmitter;
+ private boolean isIdle;
/**
* The watermark related state for each shard consumer. Entries in this map will be created when shards
@@ -280,8 +281,8 @@ public class KinesisDataFetcher<T> {
@Override
public RecordQueue<RecordWrapper<T>> getQueue(int producerIndex) {
- return queues.computeIfAbsent(producerIndex, (key) -> {
- return new RecordQueue<RecordWrapper<T>>() {
+ return queues.computeIfAbsent(producerIndex, (key) ->
+ new RecordQueue<RecordWrapper<T>>() {
@Override
public void put(RecordWrapper<T> record) {
emit(record, this);
@@ -296,8 +297,6 @@ public class KinesisDataFetcher<T> {
public RecordWrapper<T> peek() {
return null;
}
-
- };
});
}
}
@@ -621,6 +620,7 @@ public class KinesisDataFetcher<T> {
}
/** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown. */
+ @SuppressWarnings("StatementWithEmptyBody")
public void awaitTermination() throws InterruptedException {
while (!shardConsumersExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
// Keep waiting.
@@ -757,8 +757,6 @@ public class KinesisDataFetcher<T> {
*
* <p>Responsible for tracking per shard watermarks and emit timestamps extracted from
* the record, when a watermark assigner was configured.
- *
- * @param rw
*/
private void emitRecordAndUpdateState(RecordWrapper<T> rw) {
synchronized (checkpointLock) {
@@ -952,22 +950,22 @@ public class KinesisDataFetcher<T> {
/** Timer task to update shared watermark state. */
private class WatermarkSyncCallback implements ProcessingTimeCallback {
+ private static final long LOG_INTERVAL_MILLIS = 60_000;
+
private final ProcessingTimeService timerService;
private final long interval;
- private final MetricGroup shardMetricsGroup;
private long lastGlobalWatermark = Long.MIN_VALUE;
private long propagatedLocalWatermark = Long.MIN_VALUE;
- private long logIntervalMillis = 60_000;
private int stalledWatermarkIntervalCount = 0;
private long lastLogged;
WatermarkSyncCallback(ProcessingTimeService timerService, long interval) {
this.timerService = checkNotNull(timerService);
this.interval = interval;
- this.shardMetricsGroup = consumerMetricGroup.addGroup("subtaskId",
+ MetricGroup shardMetricsGroup = consumerMetricGroup.addGroup("subtaskId",
String.valueOf(indexOfThisConsumerSubtask));
- this.shardMetricsGroup.gauge("localWatermark", () -> nextWatermark);
- this.shardMetricsGroup.gauge("globalWatermark", () -> lastGlobalWatermark);
+ shardMetricsGroup.gauge("localWatermark", () -> nextWatermark);
+ shardMetricsGroup.gauge("globalWatermark", () -> lastGlobalWatermark);
}
public void start() {
@@ -987,7 +985,7 @@ public class KinesisDataFetcher<T> {
LOG.info("WatermarkSyncCallback subtask: {} is idle", indexOfThisConsumerSubtask);
}
- if (timestamp - lastLogged > logIntervalMillis) {
+ if (timestamp - lastLogged > LOG_INTERVAL_MILLIS) {
lastLogged = System.currentTimeMillis();
LOG.info("WatermarkSyncCallback subtask: {} local watermark: {}"
+ ", global watermark: {}, delta: {} timeouts: {}, emitter: {}",
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 2c21f11..fe4ae32b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -52,7 +52,6 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Date;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -213,8 +212,6 @@ public class KinesisProxy implements KinesisProxyInterface {
/**
* Create the Kinesis client, using the provided configuration properties and default {@link ClientConfiguration}.
* Derived classes can override this method to customize the client configuration.
- * @param configProps
- * @return
*/
protected AmazonKinesis createKinesisClient(Properties configProps) {
@@ -483,12 +480,7 @@ public class KinesisProxy implements KinesisProxyInterface {
// https://github.com/lyft/kinesalite/pull/4
if (startShardId != null && listShardsResults != null) {
List<Shard> shards = listShardsResults.getShards();
- Iterator<Shard> shardItr = shards.iterator();
- while (shardItr.hasNext()) {
- if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
- shardItr.remove();
- }
- }
+ shards.removeIf(shard -> StreamShardHandle.compareShardIds(shard.getShardId(), startShardId) <= 0);
}
return listShardsResults;
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index b277696..5493774 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -28,8 +28,6 @@ import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConsta
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -77,8 +75,6 @@ public class KinesisConfigUtil {
/** Default values for ThreadPoolSize. **/
protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
- private static final Logger LOG = LoggerFactory.getLogger(KinesisConfigUtil.class);
-
/**
* Validate configuration properties for {@link FlinkKinesisConsumer}.
*/
@@ -87,6 +83,7 @@ public class KinesisConfigUtil {
validateAwsConfiguration(config);
+ //noinspection SimplifiableBooleanExpression - the current logic expression is actually easier to understand
if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^ config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
// per validation in AwsClientBuilder
throw new IllegalArgumentException(String.format("For FlinkKinesisConsumer either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
@@ -177,6 +174,7 @@ public class KinesisConfigUtil {
* Replace deprecated configuration properties for {@link FlinkKinesisProducer}.
* This should be remove along with deprecated keys
*/
+ @SuppressWarnings("deprecation")
public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
// Replace deprecated key
if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
@@ -207,6 +205,7 @@ public class KinesisConfigUtil {
* @param configProps original config properties.
* @return backfilled config properties.
*/
+ @SuppressWarnings("UnusedReturnValue")
public static Properties backfillConsumerKeys(Properties configProps) {
HashMap<String, String> oldKeyToNewKeys = new HashMap<>();
oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);