You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zhenzhongxu <gi...@git.apache.org> on 2017/06/26 22:52:12 UTC

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

GitHub user zhenzhongxu opened a pull request:

    https://github.com/apache/flink/pull/4187

    [FLINK-6998][Kafka Connector] Add kafka offset commit metrics in cons…

     add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in KafkaConsumerThread class.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhenzhongxu/flink FLINK-6998

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4187.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4187
    
----
commit 7d3f0af6511d732c3cc2cb5231d76f9e8a12e684
Author: Zhenzhong Xu <zx...@netflix.com>
Date:   2017-06-26T22:51:09Z

    [FLINK-6998][Kafka Connector] Add kafka offset commit metrics in consumer callback

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127966695
  
    --- Diff: docs/monitoring/metrics.md ---
    @@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier:
       </tbody>
     </table>
     
    +#### Connector:
    --- End diff --
    
    Let's name this heading "Connectors", and add a new sub-heading specifically for kafka.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    Merging ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    Hi @zhenzhongxu, could you rebase on to the latest master? Currently the PR contains commits unrelated to the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    Hi @zhenzhongxu, thanks for the PR. I like the idea of exposing these metrics.
    
    Regarding the metric naming:
    I think we also expose metrics that are directly available through the Kafka client.
    Apart from this PR, there was also previous discussion about adding other Flink-specific metrics to the connector (e.g., offset lag since last checkpoint).
    I wonder whether or not we should regulate the naming scheme for these Flink-specific metrics, to set them apart from Kafka's provided metrics. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    Please resolve the checkstyle violations:
    ```
    
    [ERROR] src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java:[96] (javadoc) JavadocStyle: First sentence should end with a period.
    [ERROR] src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java:[99] (javadoc) JavadocStyle: First sentence should end with a period.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r130058523
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -482,6 +501,12 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception
     
     			if (ex != null) {
     				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
    +				if (callerCommitCallback != null) {
    --- End diff --
    
    See my comment above. Would like to remove these null checks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    > I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity
    perform the monitoring (difference of committed offset vs log head), especially in failure situations.
    
    Yes, lets keep that apart from this PR. There is also a JIRA for exactly this feature: https://issues.apache.org/jira/browse/FLINK-6109


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127636859
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +/**
    + * A callback interface that the user can implement to trigger custom actions when a commit request completes.
    + */
    +public interface KafkaCommitCallback {
    --- End diff --
    
    Not entirely sure of the need of an interface here. Do we allow "version-specific subclasses" to have their own `KafkaCommitCallback` implementation?
    
    If not, then this makes the usage slightly confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127400689
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception {
     			throw new Exception("The partitions were not set for the consumer");
     		}
     
    +		// initialize commit metrics and default offset callback method
    +		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
    +		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
    +
    +		this.offsetCommitCallback = new KafkaCommitCallback() {
    +			@Override
    +			public void onComplete(Exception exception) {
    +				if (exception == null) {
    +					successfulCommits.inc();
    --- End diff --
    
    If the callbacks are executed by another thread the result will be inaccurate, as the default counter implementation is not thread-safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    I agree. "commits-succeeded" and "commits-failed" seems good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    How about just "commits-succeeded" and "commits-failed" as metric names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127636523
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -185,6 +187,18 @@
     	private volatile boolean running = true;
     
     	// ------------------------------------------------------------------------
    +	//  internal metrics
    +	// ------------------------------------------------------------------------
    +
    +	/** Counter for successful Kafka offset commits. */
    +	private transient Counter successfulCommits;
    +
    +	/** Counter for failed Kafka offset commits. */
    +	private transient Counter failedCommits;
    +
    +	private transient KafkaCommitCallback offsetCommitCallback;
    --- End diff --
    
    Can you include a Javadoc for this too? (for consistency)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r129663181
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +/**
    + * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
    + * which should normally be triggered from checkpoint complete event.
    + */
    +public interface KafkaCommitCallback {
    --- End diff --
    
    I think it would be nice to have two methods here: `onSuccess()` and `onException(...)`.
    Or does this have to be a SAM interface so you can use lambdas?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127400527
  
    --- Diff: docs/monitoring/metrics.md ---
    @@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier:
       </tbody>
     </table>
     
    +#### Connector:
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Scope</th>
    +      <th class="text-left" style="width: 30%">Metrics</th>
    +      <th class="text-left" style="width: 50%">Description</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +      <th rowspan="1">Slot/Consumer</th>
    --- End diff --
    
    This documentation is inconsistent with the rest.
    
    Scope should be "Operator", and you should add an additional "Infix" column which contains the names of the metric groups you are creating, concatenated with a period.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127560857
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception {
     			throw new Exception("The partitions were not set for the consumer");
     		}
     
    +		// initialize commit metrics and default offset callback method
    +		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
    +		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
    +
    +		this.offsetCommitCallback = new KafkaCommitCallback() {
    +			@Override
    +			public void onComplete(Exception exception) {
    +				if (exception == null) {
    +					successfulCommits.inc();
    --- End diff --
    
    This is only invoked during checkpoint complete notification. There is no thread race condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    I don't think it matters to much whether a metric is completely measured by Flink or just forwarded from kafka classes. Having "kafka" in the name also introduces an inherent redundancy, since the scope for, say the KafkaConsumerThread, already contains "KafkaConsumer".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r129663595
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +/**
    + * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
    + * which should normally be triggered from checkpoint complete event.
    + */
    +public interface KafkaCommitCallback {
    --- End diff --
    
    Having the two methods would make the om success case more clear...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r124235516
  
    --- Diff: docs/monitoring/metrics.md ---
    @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
       </tbody>
     </table>
     
    +#### Connector:
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Scope</th>
    +      <th class="text-left" style="width: 30%">Metrics</th>
    +      <th class="text-left" style="width: 50%">Description</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +      <th rowspan="1">Slot/Consumer</th>
    +      <td>kafkaCommitsSucceeded</td>
    +      <td>Kafka offset commit success count if Kafka commit is turned on.</td>
    --- End diff --
    
    I would make this statement more strict, in that the metric only exists if commit is turned on AND checkpointing is enabled. The added metrics would not appear if only Kafka's periodic offset committing is turned on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4187


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r129663458
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +/**
    + * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
    + * which should normally be triggered from checkpoint complete event.
    + */
    +public interface KafkaCommitCallback {
    +
    +	/**
    +	 * A callback method the user can implement to provide asynchronous handling of commit request completion.
    +	 * This method will be called when the commit request sent to the server has been acknowledged.
    +	 *
    +	 * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
    +	 */
    +	void onComplete(Exception exception);
    --- End diff --
    
    Most other exception handlers take a `Throwable`. Would it make sense to do that here as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r124413737
  
    --- Diff: docs/monitoring/metrics.md ---
    @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
       </tbody>
     </table>
     
    +#### Connector:
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Scope</th>
    +      <th class="text-left" style="width: 30%">Metrics</th>
    +      <th class="text-left" style="width: 50%">Description</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +      <th rowspan="1">Slot/Consumer</th>
    +      <td>kafkaCommitsSucceeded</td>
    +      <td>Kafka offset commit success count if Kafka commit is turned on.</td>
    --- End diff --
    
    Sure, thanks for the feedback. Will update.a


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    @tzulitai seems the last CI pipeline failed because of stability issues, how can I trigger another build without making a commit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r130058404
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---
    @@ -346,15 +346,21 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti
     	// ------------------------------------------------------------------------
     
     	@Override
    -	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
    +	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
     		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
     		if (zkHandler != null) {
     			try {
     				// the ZK handler takes care of incrementing the offsets by 1 before committing
     				zkHandler.prepareAndCommitOffsets(offsets);
    +				if (commitCallback != null) {
    --- End diff --
    
    I would actually like to remove these null checks, and have the contract that a callback will always be provided with `@Nonnull` annotation.
    
    AFAIK, the only reason we need these null checks is that the tests, for simplicity, provide a `null` as the callback. IMO, it isn't a good practice to have logic in the main code just to satisfy testing shortcuts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r124235539
  
    --- Diff: docs/monitoring/metrics.md ---
    @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
       </tbody>
     </table>
     
    +#### Connector:
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Scope</th>
    +      <th class="text-left" style="width: 30%">Metrics</th>
    +      <th class="text-left" style="width: 50%">Description</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +      <th rowspan="1">Slot/Consumer</th>
    +      <td>kafkaCommitsSucceeded</td>
    +      <td>Kafka offset commit success count if Kafka commit is turned on.</td>
    +    </tr>
    +    <tr>
    +       <th rowspan="1">Slot/Consumer</th>
    +       <td>kafkaCommitsFailed</td>
    +       <td>Kafka offset commit failure count if Kafka commit is turned on.</td>
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r130058545
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -482,6 +501,12 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception
     
     			if (ex != null) {
     				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
    +				if (callerCommitCallback != null) {
    +					callerCommitCallback.onException(ex);
    +				}
    +			}
    +			else if (callerCommitCallback != null) {
    --- End diff --
    
    See my comment above. Would like to remove these null checks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127636725
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception {
     			throw new Exception("The partitions were not set for the consumer");
     		}
     
    +		// initialize commit metrics and default offset callback method
    +		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
    +		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
    +
    +		this.offsetCommitCallback = new KafkaCommitCallback() {
    +			@Override
    +			public void onComplete(Exception exception) {
    +				if (exception == null) {
    +					successfulCommits.inc();
    --- End diff --
    
    I would also like to raise a thread-safety issue here.
    
    Currently, since there's always only one pending offset commit in Kafka 09+, and Kafka08 commits in a blocking call, there will be no race condition in incrementing the counters. However, changing these implementations in subclasses (perhaps in the future) can easily introduce race conditions here.
    
    At the very least, we probably should add a notice about thread-safety contract in the Javadoc of `commitInternalOffsetsToKafka`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r128078950
  
    --- Diff: docs/monitoring/metrics.md ---
    @@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier:
       </tbody>
     </table>
     
    +#### Connector:
    --- End diff --
    
    ok make sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127636481
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -293,12 +296,28 @@ public void shutdown() {
     	 * @param offsetsToCommit The offsets to commit
     	 */
     	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
    +		setOffsetsToCommit(offsetsToCommit, null);
    +	}
    +
    +	/**
    +	 * Tells this thread to commit a set of offsets. This method does not block, the committing
    +	 * operation will happen asynchronously.
    +	 *
    +	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
    +	 * the frequency with which this method is called, then some commits may be skipped due to being
    +	 * superseded  by newer ones.
    --- End diff --
    
    Unnecessary empty space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    @tzulitai all tests passing now. let me know if this looks ok now, also let me know if you want me to go ahead squash all commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    Please follow the camel-case pattern that we use for other metrics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127842765
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception {
     			throw new Exception("The partitions were not set for the consumer");
     		}
     
    +		// initialize commit metrics and default offset callback method
    +		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
    +		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
    +
    +		this.offsetCommitCallback = new KafkaCommitCallback() {
    +			@Override
    +			public void onComplete(Exception exception) {
    +				if (exception == null) {
    +					successfulCommits.inc();
    --- End diff --
    
    I think I'll add a detailed javadoc to describe the current unprotected implementation. I am hesitant to add in lock protection because higher level abstraction currently guarantees no concurrent commit at the moment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r126283335
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -184,6 +186,18 @@
     	/** Flag indicating whether the consumer is still running. */
     	private volatile boolean running = true;
     
    +    // ------------------------------------------------------------------------
    --- End diff --
    
    The indentation is off by 1 space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127400942
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +/**
    + * A callback interface that the user can implement to trigger custom actions when a commit request completes.
    --- End diff --
    
    Is this callback actually exposed to users?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    @tzulitai 
    
    **Regarding the metric naming:**
    Any suggestions on naming conventions for these flink specific metrics? How do you like 'kafkaconnector-commits-succeeded' (component-metric-name) as an example? I personally like hyphen seperators better than camel case for metric names. 
    I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity 
    perform the monitoring (difference of committed offset vs log head), especially in failure situations.
    
    **Regarding the implementation:**
    Thanks for the feedback. I'll explore the more proper implementation suggested, I'll get back to you with a solution or question.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r124284709
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -119,6 +126,9 @@ public KafkaConsumerThread(
     
     		this.nextOffsetsToCommit = new AtomicReference<>();
     		this.running = true;
    +
    +		this.successfulCommits = kafkaMetricGroup.counter("kafkaCommitsSucceeded");
    +		this.failedCommits = kafkaMetricGroup.counter("kafkaCommitsFailed");
    --- End diff --
    
    Please see my comment regarding the naming of the metrics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r130058073
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -308,17 +311,33 @@ public void shutdown() {
     	 *
     	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
     	 * the frequency with which this method is called, then some commits may be skipped due to being
    -	 * superseded  by newer ones.
    +	 * superseded by newer ones.
     	 *
     	 * @param offsetsToCommit The offsets to commit
     	 */
     	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
    --- End diff --
    
    I don't think we need this variant anymore. The only user of the method is `Kafka09Fetcher`, anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127838984
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -185,6 +187,18 @@
     	private volatile boolean running = true;
     
     	// ------------------------------------------------------------------------
    +	//  internal metrics
    +	// ------------------------------------------------------------------------
    +
    +	/** Counter for successful Kafka offset commits. */
    +	private transient Counter successfulCommits;
    +
    +	/** Counter for failed Kafka offset commits. */
    +	private transient Counter failedCommits;
    +
    +	private transient KafkaCommitCallback offsetCommitCallback;
    --- End diff --
    
    Sounds fair, I'll include a javadoc as well a notice about the thread-safety contract as you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r126283316
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -104,7 +106,7 @@
     	//  configuration state, set on the client relevant for all subtasks
     	// ------------------------------------------------------------------------
     
    -	/** Describes whether we are discovering partitions for fixed topics or a topic pattern. */
    +	/** Descrnterbes whether we are discovering partitions for fixed topics or a topic pattern. */
    --- End diff --
    
    This is probably an unintended change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r127560753
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internals;
    +
    +/**
    + * A callback interface that the user can implement to trigger custom actions when a commit request completes.
    --- End diff --
    
    I updated the wording, this is only exposed to the kafka source operator (invoked upon checkpoint compete), not directly to users. Hence, there is no thread race condition either.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4187: [FLINK-6998][Kafka Connector] Add kafka offset commit met...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on the issue:

    https://github.com/apache/flink/pull/4187
  
    @tzulitai rebase done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by zhenzhongxu <gi...@git.apache.org>.
Github user zhenzhongxu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r124413756
  
    --- Diff: docs/monitoring/metrics.md ---
    @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
       </tbody>
     </table>
     
    +#### Connector:
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Scope</th>
    +      <th class="text-left" style="width: 30%">Metrics</th>
    +      <th class="text-left" style="width: 50%">Description</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +      <th rowspan="1">Slot/Consumer</th>
    +      <td>kafkaCommitsSucceeded</td>
    +      <td>Kafka offset commit success count if Kafka commit is turned on.</td>
    +    </tr>
    +    <tr>
    +       <th rowspan="1">Slot/Consumer</th>
    +       <td>kafkaCommitsFailed</td>
    +       <td>Kafka offset commit failure count if Kafka commit is turned on.</td>
    --- End diff --
    
    Sure, thanks for the feedback. Will update.a


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4187: [FLINK-6998][Kafka Connector] Add kafka offset com...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4187#discussion_r130058480
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---
    @@ -346,15 +346,21 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti
     	// ------------------------------------------------------------------------
     
     	@Override
    -	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
    +	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
     		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
     		if (zkHandler != null) {
     			try {
     				// the ZK handler takes care of incrementing the offsets by 1 before committing
     				zkHandler.prepareAndCommitOffsets(offsets);
    +				if (commitCallback != null) {
    +					commitCallback.onSuccess();
    +				}
     			}
     			catch (Exception e) {
     				if (running) {
    +					if (commitCallback != null) {
    --- End diff --
    
    See my comment above. Would like to remove these null checks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---