You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/28 14:49:04 UTC
[1/3] flink git commit: [FLINK-6998] [kafka] Add Kafka offset commit
metrics to Flink Kafka Consumer
Repository: flink
Updated Branches:
refs/heads/release-1.3 09caa9ffd -> 452f5d103
[FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1a173ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1a173ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1a173ad
Branch: refs/heads/release-1.3
Commit: f1a173addd99e5df00921b924352a39810d8d180
Parents: 09caa9f
Author: Zhenzhong Xu <zx...@netflix.com>
Authored: Mon Jun 26 15:51:09 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 22:04:20 2017 +0800
----------------------------------------------------------------------
docs/monitoring/metrics.md | 24 ++++++++++++
.../connectors/kafka/Kafka010FetcherTest.java | 6 +--
.../kafka/internals/Kafka08Fetcher.java | 8 +++-
.../kafka/internal/Kafka09Fetcher.java | 5 ++-
.../kafka/internal/KafkaConsumerThread.java | 30 +++++++++++++++
.../connectors/kafka/Kafka09FetcherTest.java | 6 +--
.../kafka/FlinkKafkaConsumerBase.java | 40 +++++++++++++++++++-
.../kafka/internals/AbstractFetcher.java | 5 ++-
.../kafka/internals/KafkaCommitCallback.java | 39 +++++++++++++++++++
.../kafka/FlinkKafkaConsumerBaseTest.java | 13 ++++---
.../kafka/internals/AbstractFetcherTest.java | 2 +-
11 files changed, 159 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index fc7b595..8b2f26d 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -867,6 +867,30 @@ Thus, in order to infer the metric identifier:
</tbody>
</table>
+#### Connectors:
+
+##### Kafka Connectors
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Scope</th>
+ <th class="text-left" style="width: 30%">Metrics</th>
+ <th class="text-left" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <th rowspan="1">Operator</th>
+ <td>commitsSucceeded</td>
+ <td>Kafka offset commit success count if Kafka commit is turned on and checkpointing is enabled.</td>
+ </tr>
+ <tr>
+ <th rowspan="1">Operator</th>
+ <td>commitsFailed</td>
+ <td>Kafka offset commit failure count if Kafka commit is turned on and checkpointing is enabled.</td>
+ </tr>
+ </tbody>
+</table>
### Latency tracking
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 2d0551d..f00f513 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -163,7 +163,7 @@ public class Kafka010FetcherTest {
@Override
public void run() {
try {
- fetcher.commitInternalOffsetsToKafka(testCommitData);
+ fetcher.commitInternalOffsetsToKafka(testCommitData, null);
} catch (Throwable t) {
commitError.set(t);
}
@@ -291,7 +291,7 @@ public class Kafka010FetcherTest {
// ----- trigger the first offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData1);
+ fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -308,7 +308,7 @@ public class Kafka010FetcherTest {
// ----- trigger the second offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData2);
+ fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index de201e5..04cafd9 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -356,15 +356,21 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
// ------------------------------------------------------------------------
@Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
+ if (commitCallback != null) {
+ commitCallback.onSuccess();
+ }
}
catch (Exception e) {
if (running) {
+ if (commitCallback != null) {
+ commitCallback.onException(e);
+ }
throw e;
} else {
return;
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 1c87542..96e0b91 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -211,7 +212,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
}
@Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitionStates();
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
@@ -228,6 +229,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
}
// record the work to be committed by the main consumer thread and make sure the consumer notices that
- consumerThread.setOffsetsToCommit(offsetsToCommit);
+ consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index cbe1551..ae0db85 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
@@ -92,6 +93,8 @@ public class KafkaConsumerThread extends Thread {
/** Flag tracking whether the latest commit request has completed */
private volatile boolean commitInProgress;
+ /** User callback to be invoked when commits completed. */
+ private volatile KafkaCommitCallback callerCommitCallback;
public KafkaConsumerThread(
Logger log,
@@ -283,17 +286,38 @@ public class KafkaConsumerThread extends Thread {
*
* <p>Only one commit operation may be pending at any time. If the committing takes longer than
* the frequency with which this method is called, then some commits may be skipped due to being
+<<<<<<< HEAD
* superseded by newer ones.
*
+=======
+ * superseded by newer ones.
+ *
+>>>>>>> 8828648b4f... [FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer
* @param offsetsToCommit The offsets to commit
*/
public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
+ setOffsetsToCommit(offsetsToCommit, null);
+ }
+
+ /**
+ * Tells this thread to commit a set of offsets. This method does not block, the committing
+ * operation will happen asynchronously.
+ *
+ * <p>Only one commit operation may be pending at any time. If the committing takes longer than
+ * the frequency with which this method is called, then some commits may be skipped due to being
+ * superseded by newer ones.
+ *
+ * @param offsetsToCommit The offsets to commit
+ * @param commitCallback callback when kafka commit completes
+ */
+ public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, KafkaCommitCallback commitCallback) {
// record the work to be committed by the main consumer thread and make sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
"This does not compromise Flink's checkpoint integrity.");
}
+ this.callerCommitCallback = commitCallback;
// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
handover.wakeupProducer();
@@ -324,6 +348,12 @@ public class KafkaConsumerThread extends Thread {
if (ex != null) {
log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+ if (callerCommitCallback != null) {
+ callerCommitCallback.onException(ex);
+ }
+ }
+ else if (callerCommitCallback != null) {
+ callerCommitCallback.onSuccess();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 6e13db2..63885aa 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -163,7 +163,7 @@ public class Kafka09FetcherTest {
@Override
public void run() {
try {
- fetcher.commitInternalOffsetsToKafka(testCommitData);
+ fetcher.commitInternalOffsetsToKafka(testCommitData, null);
} catch (Throwable t) {
commitError.set(t);
}
@@ -291,7 +291,7 @@ public class Kafka09FetcherTest {
// ----- trigger the first offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData1);
+ fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -308,7 +308,7 @@ public class Kafka09FetcherTest {
// ----- trigger the second offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData2);
+ fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 5a440e0..6f2e538 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -41,6 +42,7 @@ import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -146,6 +148,24 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
private transient boolean restored = false;
// ------------------------------------------------------------------------
+ // internal metrics
+ // ------------------------------------------------------------------------
+
+ /** Counter for successful Kafka offset commits. */
+ private transient Counter successfulCommits;
+
+ /** Counter for failed Kafka offset commits. */
+ private transient Counter failedCommits;
+
+ /** Callback interface that will be invoked upon async Kafka commit completion.
+ * Please be aware that default callback implementation in base class does not
+ * provide any guarantees on thread-safety. This is sufficient for now because current
+ * supported Kafka connectors guarantee no more than 1 concurrent async pending offset
+ * commit.
+ */
+ private transient KafkaCommitCallback offsetCommitCallback;
+
+ // ------------------------------------------------------------------------
/**
* Base constructor.
@@ -421,6 +441,23 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
throw new Exception("The partitions were not set for the consumer");
}
+ // initialize commit metrics and default offset callback method
+ this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
+ this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
+
+ this.offsetCommitCallback = new KafkaCommitCallback() {
+ @Override
+ public void onSuccess() {
+ successfulCommits.inc();
+ }
+
+ @Override
+ public void onException(Throwable cause) {
+ LOG.error("Async Kafka commit failed.", cause);
+ failedCommits.inc();
+ }
+ };
+
// we need only do work, if we actually have partitions assigned
if (!subscribedPartitionsToStartOffsets.isEmpty()) {
@@ -623,7 +660,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
LOG.debug("Checkpoint state was empty.");
return;
}
- fetcher.commitInternalOffsetsToKafka(offsets);
+
+ fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
} catch (Exception e) {
if (running) {
throw e;
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 0b311a9..a553ed6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -176,10 +176,11 @@ public abstract class AbstractFetcher<T, KPH> {
* committing them, so that committed offsets to Kafka represent "the next record to process".
*
* @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
+ * @param commitCallback The callback that the user can implement to trigger custom actions when a commit request completes.
* @throws Exception This method forwards exceptions.
*/
- public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
-
+ public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception;
+
// ------------------------------------------------------------------------
// snapshot and restore the state
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
new file mode 100644
index 0000000..aca7ae5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
+ * which should normally be triggered from checkpoint complete event.
+ */
+public interface KafkaCommitCallback {
+
+ /**
+ * A callback method the user can implement to provide asynchronous handling of commit request completion.
+ * This method will be called when the commit request sent to the server has been acknowledged without error.
+ */
+ void onSuccess();
+
+ /**
+ * A callback method the user can implement to provide asynchronous handling of commit request failure.
+ * This method will be called when the commit request failed.
+ * @param cause Kafka commit failure cause returned by kafka client
+ */
+ void onException(Throwable cause);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 04508dc..f336b1c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
@@ -496,7 +497,7 @@ public class FlinkKafkaConsumerBaseTest {
// ack checkpoint 1
consumer.notifyCheckpointComplete(138L);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
// checkpoint 3
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -513,11 +514,11 @@ public class FlinkKafkaConsumerBaseTest {
// ack checkpoint 3, subsumes number 2
consumer.notifyCheckpointComplete(141L);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
consumer.notifyCheckpointComplete(666); // invalid checkpoint
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
listState = new TestingListState<>();
@@ -532,15 +533,15 @@ public class FlinkKafkaConsumerBaseTest {
// commit only the second last
consumer.notifyCheckpointComplete(598);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
// access invalid checkpoint
consumer.notifyCheckpointComplete(590);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
// and the last
consumer.notifyCheckpointComplete(599);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index c1a64c4..8612ca3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -358,7 +358,7 @@ public class AbstractFetcherTest {
}
@Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback callback) throws Exception {
throw new UnsupportedOperationException();
}
}
[2/3] flink git commit: [FLINK-6998] [kafka] Remove unfruitful null
checks on provided KafkaCommitCallbacks
Posted by tz...@apache.org.
[FLINK-6998] [kafka] Remove unfruitful null checks on provided KafkaCommitCallbacks
This closes #4187.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4382464f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4382464f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4382464f
Branch: refs/heads/release-1.3
Commit: 4382464fd140aae70767a3c9776cf27f3b57355b
Parents: f1a173a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jul 28 18:59:11 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 22:47:26 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/Kafka010FetcherTest.java | 17 +++++----
.../kafka/internals/Kafka08Fetcher.java | 15 ++++----
.../kafka/internal/Kafka09Fetcher.java | 7 +++-
.../kafka/internal/KafkaConsumerThread.java | 39 ++++++--------------
.../connectors/kafka/Kafka09FetcherTest.java | 17 +++++----
.../kafka/internals/AbstractFetcher.java | 8 +++-
.../kafka/FlinkKafkaConsumerBaseTest.java | 17 ++++++---
.../kafka/internals/AbstractFetcherTest.java | 5 ++-
8 files changed, 64 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index f00f513..3ce12b3 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -163,7 +164,7 @@ public class Kafka010FetcherTest {
@Override
public void run() {
try {
- fetcher.commitInternalOffsetsToKafka(testCommitData, null);
+ fetcher.commitInternalOffsetsToKafka(testCommitData, mock(KafkaCommitCallback.class));
} catch (Throwable t) {
commitError.set(t);
}
@@ -291,7 +292,7 @@ public class Kafka010FetcherTest {
// ----- trigger the first offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
+ fetcher.commitInternalOffsetsToKafka(testCommitData1, mock(KafkaCommitCallback.class));
Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -308,7 +309,7 @@ public class Kafka010FetcherTest {
// ----- trigger the second offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
+ fetcher.commitInternalOffsetsToKafka(testCommitData2, mock(KafkaCommitCallback.class));
Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -344,9 +345,9 @@ public class Kafka010FetcherTest {
final byte[] payload = new byte[] {1, 2, 3, 4};
final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
- new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+ new ConsumerRecord<>(topic, partition, 15, payload, payload),
+ new ConsumerRecord<>(topic, partition, 16, payload, payload),
+ new ConsumerRecord<>(topic, partition, 17, payload, payload));
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
data.put(new TopicPartition(topic, partition), records);
@@ -452,11 +453,11 @@ public class Kafka010FetcherTest {
@Override
public void close() {}
- public void waitTillHasBlocker() throws InterruptedException {
+ void waitTillHasBlocker() throws InterruptedException {
inBlocking.await();
}
- public boolean isStillBlocking() {
+ boolean isStillBlocking() {
return lock.isLocked();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 04cafd9..37bdb23 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -37,6 +37,8 @@ import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -356,21 +358,20 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
// ------------------------------------------------------------------------
@Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
+ public void commitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
- if (commitCallback != null) {
- commitCallback.onSuccess();
- }
+ commitCallback.onSuccess();
}
catch (Exception e) {
if (running) {
- if (commitCallback != null) {
- commitCallback.onException(e);
- }
+ commitCallback.onException(e);
throw e;
} else {
return;
http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 96e0b91..8bd61ed 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -212,7 +214,10 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
}
@Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
+ public void commitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitionStates();
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index ae0db85..56281a9 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -34,6 +34,8 @@ import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
+import javax.annotation.Nonnull;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -94,7 +96,7 @@ public class KafkaConsumerThread extends Thread {
private volatile boolean commitInProgress;
/** User callback to be invoked when commits completed. */
- private volatile KafkaCommitCallback callerCommitCallback;
+ private volatile KafkaCommitCallback offsetCommitCallback;
public KafkaConsumerThread(
Logger log,
@@ -286,38 +288,22 @@ public class KafkaConsumerThread extends Thread {
*
* <p>Only one commit operation may be pending at any time. If the committing takes longer than
* the frequency with which this method is called, then some commits may be skipped due to being
-<<<<<<< HEAD
- * superseded by newer ones.
- *
-=======
* superseded by newer ones.
*
->>>>>>> 8828648b4f... [FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer
* @param offsetsToCommit The offsets to commit
+ * @param commitCallback callback when Kafka commit completes
*/
- public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
- setOffsetsToCommit(offsetsToCommit, null);
- }
+ void setOffsetsToCommit(
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
+ @Nonnull KafkaCommitCallback commitCallback) {
- /**
- * Tells this thread to commit a set of offsets. This method does not block, the committing
- * operation will happen asynchronously.
- *
- * <p>Only one commit operation may be pending at any time. If the committing takes longer than
- * the frequency with which this method is called, then some commits may be skipped due to being
- * superseded by newer ones.
- *
- * @param offsetsToCommit The offsets to commit
- * @param commitCallback callback when kafka commit completes
- */
- public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, KafkaCommitCallback commitCallback) {
// record the work to be committed by the main consumer thread and make sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
"This does not compromise Flink's checkpoint integrity.");
}
- this.callerCommitCallback = commitCallback;
+ this.offsetCommitCallback = commitCallback;
// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
handover.wakeupProducer();
@@ -348,12 +334,9 @@ public class KafkaConsumerThread extends Thread {
if (ex != null) {
log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
- if (callerCommitCallback != null) {
- callerCommitCallback.onException(ex);
- }
- }
- else if (callerCommitCallback != null) {
- callerCommitCallback.onSuccess();
+ offsetCommitCallback.onException(ex);
+ } else {
+ offsetCommitCallback.onSuccess();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 63885aa..e61359f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -163,7 +164,7 @@ public class Kafka09FetcherTest {
@Override
public void run() {
try {
- fetcher.commitInternalOffsetsToKafka(testCommitData, null);
+ fetcher.commitInternalOffsetsToKafka(testCommitData, mock(KafkaCommitCallback.class));
} catch (Throwable t) {
commitError.set(t);
}
@@ -291,7 +292,7 @@ public class Kafka09FetcherTest {
// ----- trigger the first offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
+ fetcher.commitInternalOffsetsToKafka(testCommitData1, mock(KafkaCommitCallback.class));
Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -308,7 +309,7 @@ public class Kafka09FetcherTest {
// ----- trigger the second offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
+ fetcher.commitInternalOffsetsToKafka(testCommitData2, mock(KafkaCommitCallback.class));
Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -344,9 +345,9 @@ public class Kafka09FetcherTest {
final byte[] payload = new byte[] {1, 2, 3, 4};
final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
- new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+ new ConsumerRecord<>(topic, partition, 15, payload, payload),
+ new ConsumerRecord<>(topic, partition, 16, payload, payload),
+ new ConsumerRecord<>(topic, partition, 17, payload, payload));
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
data.put(new TopicPartition(topic, partition), records);
@@ -453,11 +454,11 @@ public class Kafka09FetcherTest {
@Override
public void close() {}
- public void waitTillHasBlocker() throws InterruptedException {
+ void waitTillHasBlocker() throws InterruptedException {
inBlocking.await();
}
- public boolean isStillBlocking() {
+ boolean isStillBlocking() {
return lock.isLocked();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index a553ed6..10b7530 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.SerializedValue;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -176,10 +178,12 @@ public abstract class AbstractFetcher<T, KPH> {
* committing them, so that committed offsets to Kafka represent "the next record to process".
*
* @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
- * @param commitCallback The callback that the user can implement to trigger custom actions when a commit request completes.
+ * @param commitCallback The callback that the user should trigger when a commit request completes or fails.
* @throws Exception This method forwards exceptions.
*/
- public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception;
+ public abstract void commitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback commitCallback) throws Exception;
// ------------------------------------------------------------------------
// snapshot and restore the state
http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index f336b1c..c6af119 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -212,6 +213,10 @@ public class FlinkKafkaConsumerBaseTest {
partitionList.add(new KafkaTopicPartition("test", 0));
consumer.setSubscribedPartitions(partitionList);
+ StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
+ when(runtimeContext.getMetricGroup()).thenReturn(mock(MetricGroup.class));
+ consumer.setRuntimeContext(runtimeContext);
+
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
TestingListState<Serializable> listState = new TestingListState<>();
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
@@ -497,7 +502,7 @@ public class FlinkKafkaConsumerBaseTest {
// ack checkpoint 1
consumer.notifyCheckpointComplete(138L);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
// checkpoint 3
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -514,11 +519,11 @@ public class FlinkKafkaConsumerBaseTest {
// ack checkpoint 3, subsumes number 2
consumer.notifyCheckpointComplete(141L);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
consumer.notifyCheckpointComplete(666); // invalid checkpoint
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
listState = new TestingListState<>();
@@ -533,15 +538,15 @@ public class FlinkKafkaConsumerBaseTest {
// commit only the second last
consumer.notifyCheckpointComplete(598);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
// access invalid checkpoint
consumer.notifyCheckpointComplete(590);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
// and the last
consumer.notifyCheckpointComplete(599);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 8612ca3..3a5f83c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.SerializedValue;
import org.junit.Test;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
@@ -358,7 +359,9 @@ public class AbstractFetcherTest {
}
@Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback callback) throws Exception {
+ public void commitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback callback) throws Exception {
throw new UnsupportedOperationException();
}
}
[3/3] flink git commit: [FLINK-7287] [kafka,
tests] Fix test instabilities in KafkaConsumerTestBase
Posted by tz...@apache.org.
[FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase
This closes #4414.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/452f5d10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/452f5d10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/452f5d10
Branch: refs/heads/release-1.3
Commit: 452f5d1032f61a29726dd484453a256c7d57d052
Parents: 4382464
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jul 27 18:31:13 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 22:47:46 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/KafkaConsumerTestBase.java | 21 ++++++++++----------
1 file changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/452f5d10/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1de8e6f..3467237 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -216,7 +216,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.execute();
}
catch (Throwable t) {
- if (!(t.getCause() instanceof JobCancellationException)) {
+ if (!(t instanceof JobCancellationException)) {
errorRef.set(t);
}
}
@@ -242,8 +242,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
while (System.nanoTime() < deadline);
- // cancel the job
+ // cancel the job & wait for the job to finish
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ runner.join();
final Throwable t = errorRef.get();
if (t != null) {
@@ -302,7 +303,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.execute();
}
catch (Throwable t) {
- if (!(t.getCause() instanceof JobCancellationException)) {
+ if (!(t instanceof JobCancellationException)) {
errorRef.set(t);
}
}
@@ -327,8 +328,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
while (System.nanoTime() < deadline);
- // cancel the job
+ // cancel the job & wait for the job to finish
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ runner.join();
final Throwable t = errorRef.get();
if (t != null) {
@@ -1611,9 +1613,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
env1.execute("Metrics test job");
- } catch(Throwable t) {
- LOG.warn("Got exception during execution", t);
- if(!(t instanceof JobCancellationException)) { // we'll cancel the job
+ } catch (Throwable t) {
+ if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+ LOG.warn("Got exception during execution", t);
error.f0 = t;
}
}
@@ -1664,11 +1666,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
} finally {
// cancel
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ // wait for the job to finish (it should due to the cancel command above)
+ jobThread.join();
}
- while (jobThread.isAlive()) {
- Thread.sleep(50);
- }
if (error.f0 != null) {
throw error.f0;
}