You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2017/10/27 14:23:45 UTC

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/4915

    [FLINK-7838] Bunch of hotfixes and fix missing synchronization in FlinkKafkaProducer011

    ## What is the purpose of the change
    
    Most important is the commit adding missing synchronization, that might been the cause for some deadlocks on travis. Others are just non critical hotfixes.
    
    ## Brief change log
    
    Please check individual commit messages.
    
    ## Verifying this change
    
    This change is already covered by existing Kafka 0.11 connector tests.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (**yes** / no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink f7838

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4915
    
----
commit 4e0492595f497a49c63b8ffddcc66e720e4e4433
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-24T15:35:56Z

    [hotfix][kafka] Bump Kafka 0.11 dependency
    
    This might include some bugfixes

commit e38b3461bc97a175bf67f1072b2e8a2a891c1f1a
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-24T15:57:05Z

    [FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer

commit 04127b9c44807f5379e07d801847d993c39e94b1
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-26T08:02:15Z

    [hotfix][kafka] Fix FlinkKafkaProducer011 logger

commit 8b47ac214c4022563be8128e84bc02d5de98819c
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-27T13:11:24Z

    [hotfix][kafka-tests] Fix test names so that they are not ignored by mvn build

commit a6c4c8bbdbfc5c238557e151fa8598e71a562411
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-25T16:08:46Z

    [hotfix][kafka] Move checkpointing enable checking to initializeState
    
    initializeState is called before open and since both of those functions
    relay on chosen semantic, that means checkpointing enable check should
    happen in initializeState.

commit 055e5d125df895fd010e1171d1d39f37177518a2
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-27T13:14:58Z

    [hotfix][kafka] Remove unsued field in FlinkKafkaProducer011

commit 6cf55ed8977135af01099452962962199e253348
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-10-27T13:47:26Z

    [hotfix][kafka] Do not return producers to a pool in abort for non EXACTLY_ONCE modes
    
    Previously on abort(...) producers were returned to the pool. This was minor bug,
    probably without any negative side effect, however this patch fixes it
    and adds additional sanity checks to guard against similar bugs
    in the future.

----


---

[GitHub] flink issue #4915: [FLINK-7838] Bunch of hotfixes and fix missing synchroniz...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4915
  
    Fine for me, thanks!


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147667457
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---
    @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
     
     	private void flushNewPartitions() {
     		LOG.info("Flushing new partitions");
    +		enqueueNewPartitions().await();
    +	}
    +
    +	private TransactionalRequestResult enqueueNewPartitions() {
     		Object transactionManager = getValue(kafkaProducer, "transactionManager");
    -		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    -		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
    -		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    -		Object sender = getValue(kafkaProducer, "sender");
    -		invoke(sender, "wakeup");
    -		result.await();
    +		synchronized (transactionManager) {
    +			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    +			invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
    +			TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    +			Object sender = getValue(kafkaProducer, "sender");
    +			invoke(sender, "wakeup");
    --- End diff --
    
    `sender.wakeup` is outside of the lock in the original code. Do you think it makes a difference?


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4915


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147436687
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---
    @@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean logFailuresOnly) {
     	 */
     	@Override
     	public void open(Configuration configuration) throws Exception {
    -		if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
    --- End diff --
    
    What is the benefit of moving this into `initializeState()`?


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r148226920
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java ---
    @@ -61,7 +61,7 @@
      * IT cases for the {@link FlinkKafkaProducer011}.
      */
     @SuppressWarnings("serial")
    -public class FlinkKafkaProducer011Tests extends KafkaTestBase {
    +public class FlinkKafkaProducer011Test extends KafkaTestBase {
    --- End diff --
    
    I haven't looked into too many `ITCase`s but coding guidelines require unit tests to be subsecond execution speed:
    >Please use unit tests to test isolated functionality, such as methods. Unit tests should execute in subseconds and should be preferred whenever possible. The name of unit test classes have to on *Test. Use integration tests to implement long-running tests.
    
    http://flink.apache.org/contribute-code.html#coding-guidelines
    
    Then again, I don't know how consistent the tests are.


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147652950
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---
    @@ -563,7 +553,7 @@ public void close() throws Exception {
     			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
     		}
     		try {
    -			producersPool.close();
    +			producersPool.ifPresent(pool -> pool.close());
    --- End diff --
    
    I know about this controversy/discussion. However I don't know in what universe nullable fields are better compared to `Optional` fields :|


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r148227364
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---
    @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
     
     	private void flushNewPartitions() {
     		LOG.info("Flushing new partitions");
    +		enqueueNewPartitions().await();
    +	}
    +
    +	private TransactionalRequestResult enqueueNewPartitions() {
     		Object transactionManager = getValue(kafkaProducer, "transactionManager");
    -		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    -		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
    -		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    -		Object sender = getValue(kafkaProducer, "sender");
    -		invoke(sender, "wakeup");
    -		result.await();
    +		synchronized (transactionManager) {
    +			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    --- End diff --
    
    Did you add the `!newPartitionsInTransaction.isEmpty()` check in the end? I couldn't find it on first glance. 
    
    Regarding tests, if you add the check, would your current test fail? If not, I think the behaviour isn't properly tested.


---

[GitHub] flink issue #4915: [FLINK-7838] Bunch of hotfixes and fix missing synchroniz...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4915
  
    I've added a commit to rename the 011 producer integration test to `ITCase`. Please let me know if you disagree with that @pnowojski @GJL.


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147941508
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---
    @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
     
     	private void flushNewPartitions() {
     		LOG.info("Flushing new partitions");
    +		enqueueNewPartitions().await();
    +	}
    +
    +	private TransactionalRequestResult enqueueNewPartitions() {
     		Object transactionManager = getValue(kafkaProducer, "transactionManager");
    -		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    -		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
    -		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    -		Object sender = getValue(kafkaProducer, "sender");
    -		invoke(sender, "wakeup");
    -		result.await();
    +		synchronized (transactionManager) {
    +			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    --- End diff --
    
    It is always being called on `FlinkKafkaProducer::flush()`, so that's pretty easy :)


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147689963
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---
    @@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean logFailuresOnly) {
     	 */
     	@Override
     	public void open(Configuration configuration) throws Exception {
    -		if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
    --- End diff --
    
    ok


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147674418
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---
    @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
     
     	private void flushNewPartitions() {
     		LOG.info("Flushing new partitions");
    +		enqueueNewPartitions().await();
    +	}
    +
    +	private TransactionalRequestResult enqueueNewPartitions() {
     		Object transactionManager = getValue(kafkaProducer, "transactionManager");
    -		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    -		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
    -		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    -		Object sender = getValue(kafkaProducer, "sender");
    -		invoke(sender, "wakeup");
    -		result.await();
    +		synchronized (transactionManager) {
    +			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    +			invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
    +			TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    +			Object sender = getValue(kafkaProducer, "sender");
    +			invoke(sender, "wakeup");
    --- End diff --
    
    Good catch. It shouldn't make a difference, but I will change it to better match original code.


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147675169
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---
    @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
     
     	private void flushNewPartitions() {
     		LOG.info("Flushing new partitions");
    +		enqueueNewPartitions().await();
    +	}
    +
    +	private TransactionalRequestResult enqueueNewPartitions() {
     		Object transactionManager = getValue(kafkaProducer, "transactionManager");
    -		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    -		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
    -		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    -		Object sender = getValue(kafkaProducer, "sender");
    -		invoke(sender, "wakeup");
    -		result.await();
    +		synchronized (transactionManager) {
    +			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    --- End diff --
    
    I implemented it, however in that case it would be much harder to test reliably `enqueueRequest(addPartitionsToTransactionHandler());` since in vast majority of cases this would be a dead code :/


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147656743
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---
    @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
     
     	private void flushNewPartitions() {
     		LOG.info("Flushing new partitions");
    +		enqueueNewPartitions().await();
    +	}
    +
    +	private TransactionalRequestResult enqueueNewPartitions() {
     		Object transactionManager = getValue(kafkaProducer, "transactionManager");
    -		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    -		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
    -		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    -		Object sender = getValue(kafkaProducer, "sender");
    -		invoke(sender, "wakeup");
    -		result.await();
    +		synchronized (transactionManager) {
    +			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    --- End diff --
    
    Not related to your changes and can be done in a separate PR but I think we should add this check from the original sources to *"mitigate"* KAFKA-6119:
    ```
    if (!newPartitionsInTransaction.isEmpty())
                enqueueRequest(addPartitionsToTransactionHandler());
    ```


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147754127
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---
    @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
     
     	private void flushNewPartitions() {
     		LOG.info("Flushing new partitions");
    +		enqueueNewPartitions().await();
    +	}
    +
    +	private TransactionalRequestResult enqueueNewPartitions() {
     		Object transactionManager = getValue(kafkaProducer, "transactionManager");
    -		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    -		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
    -		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    -		Object sender = getValue(kafkaProducer, "sender");
    -		invoke(sender, "wakeup");
    -		result.await();
    +		synchronized (transactionManager) {
    +			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    --- End diff --
    
    How are you testing now that `enqueueRequest` is called? 


---

[GitHub] flink issue #4915: [FLINK-7838] Bunch of hotfixes and fix missing synchroniz...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4915
  
    @GJL @aljoscha could you take a look?


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147754468
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java ---
    @@ -61,7 +61,7 @@
      * IT cases for the {@link FlinkKafkaProducer011}.
      */
     @SuppressWarnings("serial")
    -public class FlinkKafkaProducer011Tests extends KafkaTestBase {
    +public class FlinkKafkaProducer011Test extends KafkaTestBase {
    --- End diff --
    
    Shouldn't this be named `*ITCase` according to the coding conventions ?


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147942486
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java ---
    @@ -61,7 +61,7 @@
      * IT cases for the {@link FlinkKafkaProducer011}.
      */
     @SuppressWarnings("serial")
    -public class FlinkKafkaProducer011Tests extends KafkaTestBase {
    +public class FlinkKafkaProducer011Test extends KafkaTestBase {
    --- End diff --
    
    Aren't all of the other `ITCase`s starting a Flink job? 
    
    Regardless I named it `Test` originally because it is slightly more unit "testish" compared to `Kafka011ProducerExactlyOnceITCase`.


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r148441559
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java ---
    @@ -61,7 +61,7 @@
      * IT cases for the {@link FlinkKafkaProducer011}.
      */
     @SuppressWarnings("serial")
    -public class FlinkKafkaProducer011Tests extends KafkaTestBase {
    +public class FlinkKafkaProducer011Test extends KafkaTestBase {
    --- End diff --
    
    I'm leaning towards `*ITCase` here, since it is indeed an integration test with Kafka.


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147652455
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---
    @@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean logFailuresOnly) {
     	 */
     	@Override
     	public void open(Configuration configuration) throws Exception {
    -		if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
    --- End diff --
    
    `initializeState()` performs some clean up actions that depends on the semantic - cleaning up/closing lingering transactions, thus this check should happen earlier. Otherwise `EXACTLY_ONCE` cleaning up code would be executed unnecessarily.


---

[GitHub] flink issue #4915: [FLINK-7838] Bunch of hotfixes and fix missing synchroniz...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/4915
  
    👍 


---

[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4915#discussion_r147442314
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---
    @@ -563,7 +553,7 @@ public void close() throws Exception {
     			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
     		}
     		try {
    -			producersPool.close();
    +			producersPool.ifPresent(pool -> pool.close());
    --- End diff --
    
    I am not adamant about it but using `Optional` in private fields is not without controversy: https://stackoverflow.com/a/26328555
    
    Also, `ifPresent(pool -> pool.close()` only works because `close` does not declare any checked exceptions. If it did, the code would not compile.


---

[GitHub] flink issue #4915: [FLINK-7838] Bunch of hotfixes and fix missing synchroniz...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4915
  
    Thanks for the work and reviews @GJL @pnowojski.
    I did a pass also, changes LGTM.
    Left one last comment regarding the integration test class name.
    Other than that, would like to merge this if there are no other objections.


---