You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/03 14:33:05 UTC

[GitHub] [flink] zentol opened a new pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

zentol opened a new pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762
 
 
   This PR removes the Kafka 0.8/0.9 connectors.
   
   For 0.8 this is a fairly straight-forward removal, while for 0.9 several existing tests and classes have been merged into 0.10.
   I recommended reviewing commits individually.
   
   I first refactored the `KafkaConsumerThreadTest` to use an actual `Consumer` implementation instead of a mock, since the latter lead to the usual mocking issues when migrating the test to 0.10; namely API changes that went unnoticed by the mockito setup, causing test failures which took way too long to figure out.
   Next I removed the 0.8 connector, which was straight-forward since it is largely independent from other versions.
   The 0.9 removal is split into 2 commits: the first removes the 0.9 SQL jar, the second one the actual 0.9 connector.
   Since the 0.10 connector extended the 0.9 connected I had to merge the 0.9 connector into 0.10. This also presented some opportunities for simplifications (like the removal of call bridges).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zentol commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365612875
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ##########
 @@ -173,7 +205,38 @@ public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<
 	 */
 	@PublicEvolving
 	public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
-		super(subscriptionPattern, deserializer, props);
+		this(null, subscriptionPattern, deserializer, props);
 
 Review comment:
   The `KafkTopicsDescriptor` actually relies on this being null. I'll add Nullable to the private constructor for the time being.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/144075826 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   Hash:0604a77cf764e1c36abb91352c9ce9410d7e883a Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:0604a77cf764e1c36abb91352c9ce9410d7e883a
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   * 62027cd3e2e3237eebb24fb2ded311feb0ded645 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/144075826) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274) 
   * 0604a77cf764e1c36abb91352c9ce9410d7e883a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365322000
 
 

 ##########
 File path: tools/travis/stage.sh
 ##########
 @@ -128,11 +128,6 @@ flink-connectors/flink-sql-connector-kafka,"
 MODULES_TESTS="\
 flink-tests"
 
-# we can only build the Kafka 0.8 connector when building for Scala 2.11
-if [[ $PROFILE == *"scala-2.11"* ]]; then
-    MODULES_CONNECTORS="$MODULES_CONNECTORS,flink-connectors/flink-connector-kafka-0.8"
-fi
-
 
 Review comment:
   `flink-runtime/pom.xml` contains reference to Kafka 0.8 in line 562.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zentol merged pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
zentol merged pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570589277
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 (Fri Jan 03 14:36:08 UTC 2020)
   
   **Warnings:**
    * **10 pom.xml files were touched**: Check for build and licensing issues.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365327750
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.10/pom.xml
 ##########
 @@ -46,7 +46,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId>
+			<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
 
 Review comment:
   In line 117, there is still a dependency to `flink-connector-kafka-0.9`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/144075826 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   * 62027cd3e2e3237eebb24fb2ded311feb0ded645 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/144075826) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365331071
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ##########
 @@ -1047,17 +1047,41 @@ public OffsetAndMetadata committed(TopicPartition topicPartition) {
 		}
 
 		@Override
-		public void pause(TopicPartition... topicPartitions) {
+		public Set<TopicPartition> paused() {
+			return null;
+		}
+
+		@Override
+		public void pause(Collection<TopicPartition> collection) {
+		}
+
+		@Override
+		public void resume(Collection<TopicPartition> collection) {
 		}
 
 		@Override
-		public void resume(TopicPartition... topicPartitions) {
+		public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
+			return null;
 
 Review comment:
   I would suggest to either fail or to return an empty map but not `null`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/144075826 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   * 62027cd3e2e3237eebb24fb2ded311feb0ded645 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/144075826) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365318960
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ##########
 @@ -875,109 +878,194 @@ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit
 	}
 
 	@SuppressWarnings("unchecked")
-	private static KafkaConsumer<byte[], byte[]> createMockConsumer(
+	private static TestConsumer createMockConsumer(
 			final Map<TopicPartition, Long> mockConsumerAssignmentAndPosition,
 			final Map<TopicPartition, Long> mockRetrievedPositions,
 			final boolean earlyWakeup,
 			final OneShotLatch midAssignmentLatch,
 			final OneShotLatch continueAssignmentLatch) {
 
-		final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+		return new TestConsumer(mockConsumerAssignmentAndPosition, mockRetrievedPositions, earlyWakeup, midAssignmentLatch, continueAssignmentLatch);
+	}
 
-		when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				if (midAssignmentLatch != null) {
-					midAssignmentLatch.trigger();
-				}
+	private static class TestConsumer implements Consumer<byte[], byte[]> {
+		private final Map<TopicPartition, Long> mockConsumerAssignmentAndPosition;
+		private final Map<TopicPartition, Long> mockRetrievedPositions;
+		private final boolean earlyWakeup;
+		private final OneShotLatch midAssignmentLatch;
+		private final OneShotLatch continueAssignmentLatch;
+
+		private int numWakeupCalls = 0;
+
+		private TestConsumer(Map<TopicPartition, Long> mockConsumerAssignmentAndPosition, Map<TopicPartition, Long> mockRetrievedPositions, boolean earlyWakeup, OneShotLatch midAssignmentLatch, OneShotLatch continueAssignmentLatch) {
+			this.mockConsumerAssignmentAndPosition = mockConsumerAssignmentAndPosition;
+			this.mockRetrievedPositions = mockRetrievedPositions;
+			this.earlyWakeup = earlyWakeup;
+			this.midAssignmentLatch = midAssignmentLatch;
+			this.continueAssignmentLatch = continueAssignmentLatch;
+		}
 
-				if (continueAssignmentLatch != null) {
+		@Override
+		public Set<TopicPartition> assignment() {
+			if (midAssignmentLatch != null) {
+				midAssignmentLatch.trigger();
+			}
+
+			if (continueAssignmentLatch != null) {
+				try {
 					continueAssignmentLatch.await();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
 				}
-				return mockConsumerAssignmentAndPosition.keySet();
 			}
-		});
+			return mockConsumerAssignmentAndPosition.keySet();
+		}
 
-		when(mockConsumer.poll(anyLong())).thenReturn(mock(ConsumerRecords.class));
+		@Override
+		public Set<String> subscription() {
+			return null;
+		}
 
-		if (!earlyWakeup) {
-			when(mockConsumer.position(any(TopicPartition.class))).thenAnswer(new Answer<Object>() {
-				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-					return mockConsumerAssignmentAndPosition.get(invocationOnMock.getArgument(0));
-				}
-			});
-		} else {
-			when(mockConsumer.position(any(TopicPartition.class))).thenThrow(new WakeupException());
+		@Override
+		public void subscribe(List<String> list) {
 		}
 
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				mockConsumerAssignmentAndPosition.clear();
+		@Override
+		public void subscribe(List<String> list, ConsumerRebalanceListener consumerRebalanceListener) {
+		}
 
-				List<TopicPartition> assignedPartitions = invocationOnMock.getArgument(0);
-				for (TopicPartition assigned : assignedPartitions) {
-					mockConsumerAssignmentAndPosition.put(assigned, null);
-				}
-				return null;
+		@Override
+		public void assign(List<TopicPartition> assignedPartitions) {
+			mockConsumerAssignmentAndPosition.clear();
+
+			for (TopicPartition assigned : assignedPartitions) {
+				mockConsumerAssignmentAndPosition.put(assigned, null);
 			}
-		}).when(mockConsumer).assign(anyListOf(TopicPartition.class));
+		}
 
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				TopicPartition partition = invocationOnMock.getArgument(0);
-				long position = invocationOnMock.getArgument(1);
+		@Override
+		public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
+		}
 
-				if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-					throw new Exception("the current mock assignment does not contain partition " + partition);
-				} else {
-					mockConsumerAssignmentAndPosition.put(partition, position);
-				}
-				return null;
-			}
-		}).when(mockConsumer).seek(any(TopicPartition.class), anyLong());
+		@Override
+		public void unsubscribe() {
+		}
+
+		@Override
+		public ConsumerRecords<byte[], byte[]> poll(long l) {
+			return mock(ConsumerRecords.class);
+		}
+
+		@Override
+		public void commitSync() {
+		}
+
+		@Override
+		public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
+		}
+
+		@Override
+		public void commitAsync() {
+		}
+
+		@Override
+		public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
+		}
 
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				TopicPartition partition = invocationOnMock.getArgument(0);
+		@Override
+		public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
+		}
 
+		@Override
+		public void seek(TopicPartition partition, long position) {
+			if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
+				throw new RuntimeException("the current mock assignment does not contain partition " + partition);
+			} else {
+				mockConsumerAssignmentAndPosition.put(partition, position);
+			}
+		}
+
+		@Override
+		public void seekToBeginning(TopicPartition... partitions) {
+			for (TopicPartition partition : partitions) {
 				if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-					throw new Exception("the current mock assignment does not contain partition " + partition);
+					throw new RuntimeException("the current mock assignment does not contain partition " + partition);
 				} else {
 					Long mockRetrievedPosition = mockRetrievedPositions.get(partition);
 					if (mockRetrievedPosition == null) {
-						throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
+						throw new RuntimeException("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
 					} else {
 						mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition));
 					}
 				}
-				return null;
 			}
-		}).when(mockConsumer).seekToBeginning(any(TopicPartition.class));
-
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				TopicPartition partition = invocationOnMock.getArgument(0);
+		}
 
+		@Override
+		public void seekToEnd(TopicPartition... partitions) {
+			for (TopicPartition partition : partitions) {
 				if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-					throw new Exception("the current mock assignment does not contain partition " + partition);
+					throw new RuntimeException("the current mock assignment does not contain partition " + partition);
 				} else {
 					Long mockRetrievedPosition = mockRetrievedPositions.get(partition);
 					if (mockRetrievedPosition == null) {
-						throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
+						throw new RuntimeException("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
 					} else {
 						mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition));
 					}
 				}
-				return null;
 			}
-		}).when(mockConsumer).seekToEnd(any(TopicPartition.class));
+		}
+
+		@Override
+		public long position(TopicPartition topicPartition) {
+			if (!earlyWakeup) {
+				return mockConsumerAssignmentAndPosition.get(topicPartition);
+			} else {
+				throw new WakeupException();
+			}
+		}
 
-		return mockConsumer;
+		@Override
+		public OffsetAndMetadata committed(TopicPartition topicPartition) {
+			return null;
 
 Review comment:
   Should we throw `UnsupportedOperationException` here in order to avoid NPE somewhere else in the code?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365331143
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ##########
 @@ -1047,17 +1047,41 @@ public OffsetAndMetadata committed(TopicPartition topicPartition) {
 		}
 
 		@Override
-		public void pause(TopicPartition... topicPartitions) {
+		public Set<TopicPartition> paused() {
+			return null;
+		}
+
+		@Override
+		public void pause(Collection<TopicPartition> collection) {
+		}
+
+		@Override
+		public void resume(Collection<TopicPartition> collection) {
 		}
 
 		@Override
-		public void resume(TopicPartition... topicPartitions) {
+		public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
+			return null;
+		}
+
+		@Override
+		public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
+			return null;
+		}
+
+		@Override
+		public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
+			return null;
 
 Review comment:
   Same here with `null`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   * 62027cd3e2e3237eebb24fb2ded311feb0ded645 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zentol commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365613821
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ##########
 @@ -875,109 +878,194 @@ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit
 	}
 
 	@SuppressWarnings("unchecked")
-	private static KafkaConsumer<byte[], byte[]> createMockConsumer(
+	private static TestConsumer createMockConsumer(
 			final Map<TopicPartition, Long> mockConsumerAssignmentAndPosition,
 			final Map<TopicPartition, Long> mockRetrievedPositions,
 			final boolean earlyWakeup,
 			final OneShotLatch midAssignmentLatch,
 			final OneShotLatch continueAssignmentLatch) {
 
-		final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+		return new TestConsumer(mockConsumerAssignmentAndPosition, mockRetrievedPositions, earlyWakeup, midAssignmentLatch, continueAssignmentLatch);
+	}
 
-		when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				if (midAssignmentLatch != null) {
-					midAssignmentLatch.trigger();
-				}
+	private static class TestConsumer implements Consumer<byte[], byte[]> {
+		private final Map<TopicPartition, Long> mockConsumerAssignmentAndPosition;
+		private final Map<TopicPartition, Long> mockRetrievedPositions;
+		private final boolean earlyWakeup;
+		private final OneShotLatch midAssignmentLatch;
+		private final OneShotLatch continueAssignmentLatch;
+
+		private int numWakeupCalls = 0;
+
+		private TestConsumer(Map<TopicPartition, Long> mockConsumerAssignmentAndPosition, Map<TopicPartition, Long> mockRetrievedPositions, boolean earlyWakeup, OneShotLatch midAssignmentLatch, OneShotLatch continueAssignmentLatch) {
+			this.mockConsumerAssignmentAndPosition = mockConsumerAssignmentAndPosition;
+			this.mockRetrievedPositions = mockRetrievedPositions;
+			this.earlyWakeup = earlyWakeup;
+			this.midAssignmentLatch = midAssignmentLatch;
+			this.continueAssignmentLatch = continueAssignmentLatch;
+		}
 
-				if (continueAssignmentLatch != null) {
+		@Override
+		public Set<TopicPartition> assignment() {
+			if (midAssignmentLatch != null) {
+				midAssignmentLatch.trigger();
+			}
+
+			if (continueAssignmentLatch != null) {
+				try {
 					continueAssignmentLatch.await();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
 				}
-				return mockConsumerAssignmentAndPosition.keySet();
 			}
-		});
+			return mockConsumerAssignmentAndPosition.keySet();
+		}
 
-		when(mockConsumer.poll(anyLong())).thenReturn(mock(ConsumerRecords.class));
+		@Override
+		public Set<String> subscription() {
+			return null;
+		}
 
-		if (!earlyWakeup) {
-			when(mockConsumer.position(any(TopicPartition.class))).thenAnswer(new Answer<Object>() {
-				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-					return mockConsumerAssignmentAndPosition.get(invocationOnMock.getArgument(0));
-				}
-			});
-		} else {
-			when(mockConsumer.position(any(TopicPartition.class))).thenThrow(new WakeupException());
+		@Override
+		public void subscribe(List<String> list) {
 		}
 
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				mockConsumerAssignmentAndPosition.clear();
+		@Override
+		public void subscribe(List<String> list, ConsumerRebalanceListener consumerRebalanceListener) {
+		}
 
-				List<TopicPartition> assignedPartitions = invocationOnMock.getArgument(0);
-				for (TopicPartition assigned : assignedPartitions) {
-					mockConsumerAssignmentAndPosition.put(assigned, null);
-				}
-				return null;
+		@Override
+		public void assign(List<TopicPartition> assignedPartitions) {
+			mockConsumerAssignmentAndPosition.clear();
+
+			for (TopicPartition assigned : assignedPartitions) {
+				mockConsumerAssignmentAndPosition.put(assigned, null);
 			}
-		}).when(mockConsumer).assign(anyListOf(TopicPartition.class));
+		}
 
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				TopicPartition partition = invocationOnMock.getArgument(0);
-				long position = invocationOnMock.getArgument(1);
+		@Override
+		public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
+		}
 
-				if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-					throw new Exception("the current mock assignment does not contain partition " + partition);
-				} else {
-					mockConsumerAssignmentAndPosition.put(partition, position);
-				}
-				return null;
-			}
-		}).when(mockConsumer).seek(any(TopicPartition.class), anyLong());
+		@Override
+		public void unsubscribe() {
+		}
+
+		@Override
+		public ConsumerRecords<byte[], byte[]> poll(long l) {
+			return mock(ConsumerRecords.class);
+		}
+
+		@Override
+		public void commitSync() {
+		}
+
+		@Override
+		public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
+		}
+
+		@Override
+		public void commitAsync() {
+		}
+
+		@Override
+		public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
+		}
 
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				TopicPartition partition = invocationOnMock.getArgument(0);
+		@Override
+		public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
+		}
 
+		@Override
+		public void seek(TopicPartition partition, long position) {
+			if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
+				throw new RuntimeException("the current mock assignment does not contain partition " + partition);
+			} else {
+				mockConsumerAssignmentAndPosition.put(partition, position);
+			}
+		}
+
+		@Override
+		public void seekToBeginning(TopicPartition... partitions) {
+			for (TopicPartition partition : partitions) {
 				if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-					throw new Exception("the current mock assignment does not contain partition " + partition);
+					throw new RuntimeException("the current mock assignment does not contain partition " + partition);
 				} else {
 					Long mockRetrievedPosition = mockRetrievedPositions.get(partition);
 					if (mockRetrievedPosition == null) {
-						throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
+						throw new RuntimeException("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
 					} else {
 						mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition));
 					}
 				}
-				return null;
 			}
-		}).when(mockConsumer).seekToBeginning(any(TopicPartition.class));
-
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				TopicPartition partition = invocationOnMock.getArgument(0);
+		}
 
+		@Override
+		public void seekToEnd(TopicPartition... partitions) {
+			for (TopicPartition partition : partitions) {
 				if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-					throw new Exception("the current mock assignment does not contain partition " + partition);
+					throw new RuntimeException("the current mock assignment does not contain partition " + partition);
 				} else {
 					Long mockRetrievedPosition = mockRetrievedPositions.get(partition);
 					if (mockRetrievedPosition == null) {
-						throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
+						throw new RuntimeException("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
 					} else {
 						mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition));
 					}
 				}
-				return null;
 			}
-		}).when(mockConsumer).seekToEnd(any(TopicPartition.class));
+		}
+
+		@Override
+		public long position(TopicPartition topicPartition) {
+			if (!earlyWakeup) {
+				return mockConsumerAssignmentAndPosition.get(topicPartition);
+			} else {
+				throw new WakeupException();
+			}
+		}
 
-		return mockConsumer;
+		@Override
+		public OffsetAndMetadata committed(TopicPartition topicPartition) {
+			return null;
 
 Review comment:
   Tests still passes with all methods that return something throwing an exception instead, so we'll go with that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/144075826 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   * 62027cd3e2e3237eebb24fb2ded311feb0ded645 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/144075826) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zentol commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365612987
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ##########
 @@ -130,7 +162,7 @@ public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deser
 	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
 	public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
-		super(topics, deserializer, props);
+		this(topics, null, deserializer, props);
 
 Review comment:
   Will add Nullable with the same reasoning as for the list of topics.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/144075826 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   Hash:0604a77cf764e1c36abb91352c9ce9410d7e883a Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4292 TriggerType:PUSH TriggerID:0604a77cf764e1c36abb91352c9ce9410d7e883a
   Hash:0604a77cf764e1c36abb91352c9ce9410d7e883a Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/144140062 TriggerType:PUSH TriggerID:0604a77cf764e1c36abb91352c9ce9410d7e883a
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   * 62027cd3e2e3237eebb24fb2ded311feb0ded645 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/144075826) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274) 
   * 0604a77cf764e1c36abb91352c9ce9410d7e883a Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/144140062) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4292) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365328689
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ##########
 @@ -130,7 +162,7 @@ public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deser
 	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
 	public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
-		super(topics, deserializer, props);
+		this(topics, null, deserializer, props);
 
 Review comment:
   Same here with the `null` value for `Pattern`. I think it would be good to avoid it if possible.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365328481
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ##########
 @@ -173,7 +205,38 @@ public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<
 	 */
 	@PublicEvolving
 	public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
-		super(subscriptionPattern, deserializer, props);
+		this(null, subscriptionPattern, deserializer, props);
 
 Review comment:
   Can we avoid `null` and instead pass in an empty collection? If not, then let's add `@Nullable` annotation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365318969
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ##########
 @@ -875,109 +878,194 @@ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartit
 	}
 
 	@SuppressWarnings("unchecked")
-	private static KafkaConsumer<byte[], byte[]> createMockConsumer(
+	private static TestConsumer createMockConsumer(
 			final Map<TopicPartition, Long> mockConsumerAssignmentAndPosition,
 			final Map<TopicPartition, Long> mockRetrievedPositions,
 			final boolean earlyWakeup,
 			final OneShotLatch midAssignmentLatch,
 			final OneShotLatch continueAssignmentLatch) {
 
-		final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+		return new TestConsumer(mockConsumerAssignmentAndPosition, mockRetrievedPositions, earlyWakeup, midAssignmentLatch, continueAssignmentLatch);
+	}
 
-		when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				if (midAssignmentLatch != null) {
-					midAssignmentLatch.trigger();
-				}
+	private static class TestConsumer implements Consumer<byte[], byte[]> {
+		private final Map<TopicPartition, Long> mockConsumerAssignmentAndPosition;
+		private final Map<TopicPartition, Long> mockRetrievedPositions;
+		private final boolean earlyWakeup;
+		private final OneShotLatch midAssignmentLatch;
+		private final OneShotLatch continueAssignmentLatch;
+
+		private int numWakeupCalls = 0;
+
+		private TestConsumer(Map<TopicPartition, Long> mockConsumerAssignmentAndPosition, Map<TopicPartition, Long> mockRetrievedPositions, boolean earlyWakeup, OneShotLatch midAssignmentLatch, OneShotLatch continueAssignmentLatch) {
+			this.mockConsumerAssignmentAndPosition = mockConsumerAssignmentAndPosition;
+			this.mockRetrievedPositions = mockRetrievedPositions;
+			this.earlyWakeup = earlyWakeup;
+			this.midAssignmentLatch = midAssignmentLatch;
+			this.continueAssignmentLatch = continueAssignmentLatch;
+		}
 
-				if (continueAssignmentLatch != null) {
+		@Override
+		public Set<TopicPartition> assignment() {
+			if (midAssignmentLatch != null) {
+				midAssignmentLatch.trigger();
+			}
+
+			if (continueAssignmentLatch != null) {
+				try {
 					continueAssignmentLatch.await();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
 				}
-				return mockConsumerAssignmentAndPosition.keySet();
 			}
-		});
+			return mockConsumerAssignmentAndPosition.keySet();
+		}
 
-		when(mockConsumer.poll(anyLong())).thenReturn(mock(ConsumerRecords.class));
+		@Override
+		public Set<String> subscription() {
+			return null;
+		}
 
-		if (!earlyWakeup) {
-			when(mockConsumer.position(any(TopicPartition.class))).thenAnswer(new Answer<Object>() {
-				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-					return mockConsumerAssignmentAndPosition.get(invocationOnMock.getArgument(0));
-				}
-			});
-		} else {
-			when(mockConsumer.position(any(TopicPartition.class))).thenThrow(new WakeupException());
+		@Override
+		public void subscribe(List<String> list) {
 		}
 
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				mockConsumerAssignmentAndPosition.clear();
+		@Override
+		public void subscribe(List<String> list, ConsumerRebalanceListener consumerRebalanceListener) {
+		}
 
-				List<TopicPartition> assignedPartitions = invocationOnMock.getArgument(0);
-				for (TopicPartition assigned : assignedPartitions) {
-					mockConsumerAssignmentAndPosition.put(assigned, null);
-				}
-				return null;
+		@Override
+		public void assign(List<TopicPartition> assignedPartitions) {
+			mockConsumerAssignmentAndPosition.clear();
+
+			for (TopicPartition assigned : assignedPartitions) {
+				mockConsumerAssignmentAndPosition.put(assigned, null);
 			}
-		}).when(mockConsumer).assign(anyListOf(TopicPartition.class));
+		}
 
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				TopicPartition partition = invocationOnMock.getArgument(0);
-				long position = invocationOnMock.getArgument(1);
+		@Override
+		public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
+		}
 
-				if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-					throw new Exception("the current mock assignment does not contain partition " + partition);
-				} else {
-					mockConsumerAssignmentAndPosition.put(partition, position);
-				}
-				return null;
-			}
-		}).when(mockConsumer).seek(any(TopicPartition.class), anyLong());
+		@Override
+		public void unsubscribe() {
+		}
+
+		@Override
+		public ConsumerRecords<byte[], byte[]> poll(long l) {
+			return mock(ConsumerRecords.class);
+		}
+
+		@Override
+		public void commitSync() {
+		}
+
+		@Override
+		public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
+		}
+
+		@Override
+		public void commitAsync() {
+		}
+
+		@Override
+		public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
+		}
 
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				TopicPartition partition = invocationOnMock.getArgument(0);
+		@Override
+		public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
+		}
 
+		@Override
+		public void seek(TopicPartition partition, long position) {
+			if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
+				throw new RuntimeException("the current mock assignment does not contain partition " + partition);
+			} else {
+				mockConsumerAssignmentAndPosition.put(partition, position);
+			}
+		}
+
+		@Override
+		public void seekToBeginning(TopicPartition... partitions) {
+			for (TopicPartition partition : partitions) {
 				if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-					throw new Exception("the current mock assignment does not contain partition " + partition);
+					throw new RuntimeException("the current mock assignment does not contain partition " + partition);
 				} else {
 					Long mockRetrievedPosition = mockRetrievedPositions.get(partition);
 					if (mockRetrievedPosition == null) {
-						throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
+						throw new RuntimeException("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
 					} else {
 						mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition));
 					}
 				}
-				return null;
 			}
-		}).when(mockConsumer).seekToBeginning(any(TopicPartition.class));
-
-		doAnswer(new Answer() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				TopicPartition partition = invocationOnMock.getArgument(0);
+		}
 
+		@Override
+		public void seekToEnd(TopicPartition... partitions) {
+			for (TopicPartition partition : partitions) {
 				if (!mockConsumerAssignmentAndPosition.containsKey(partition)) {
-					throw new Exception("the current mock assignment does not contain partition " + partition);
+					throw new RuntimeException("the current mock assignment does not contain partition " + partition);
 				} else {
 					Long mockRetrievedPosition = mockRetrievedPositions.get(partition);
 					if (mockRetrievedPosition == null) {
-						throw new Exception("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
+						throw new RuntimeException("mock consumer needed to retrieve a position, but no value was provided in the mock values for retrieval");
 					} else {
 						mockConsumerAssignmentAndPosition.put(partition, mockRetrievedPositions.get(partition));
 					}
 				}
-				return null;
 			}
-		}).when(mockConsumer).seekToEnd(any(TopicPartition.class));
+		}
+
+		@Override
+		public long position(TopicPartition topicPartition) {
+			if (!earlyWakeup) {
+				return mockConsumerAssignmentAndPosition.get(topicPartition);
+			} else {
+				throw new WakeupException();
+			}
+		}
 
-		return mockConsumer;
+		@Override
+		public OffsetAndMetadata committed(TopicPartition topicPartition) {
+			return null;
+		}
+
+		@Override
+		public Map<MetricName, ? extends Metric> metrics() {
+			return null;
+		}
+
+		@Override
+		public List<PartitionInfo> partitionsFor(String s) {
+			return null;
+		}
+
+		@Override
+		public Map<String, List<PartitionInfo>> listTopics() {
+			return null;
+		}
 
 Review comment:
   Same here with the `UnsupportedOperationException`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365326108
 
 

 ##########
 File path: docs/dev/connectors/kafka.md
 ##########
 @@ -263,7 +255,7 @@ Example:
 {% highlight java %}
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>(...);
 
 Review comment:
   `docs/dev/event_time.md`, `docs/dev/event_time.zh.md`, `docs/dev/event_timestamps_watermarks.md` and `docs/dev/event_timestamps_watermarks.zh.md` contains a reference to `FlinkKafkaConsumer09`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365322652
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##########
 @@ -38,7 +38,6 @@
 public class KafkaValidator extends ConnectorDescriptorValidator {
 
 	public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
-	public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
 
 Review comment:
   `ConnectorDescriptorValidator.java` contains a reference to Kafka 0.8 in line 47.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#issuecomment-570591280
 
 
   <!--
   Meta data
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143008336 TriggerType:PUSH TriggerID:6bfb6d18af5fdfbddadf6da53adab78567d64aa9
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/143046561 TriggerType:PUSH TriggerID:d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/144075826 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   Hash:62027cd3e2e3237eebb24fb2ded311feb0ded645 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274 TriggerType:PUSH TriggerID:62027cd3e2e3237eebb24fb2ded311feb0ded645
   Hash:0604a77cf764e1c36abb91352c9ce9410d7e883a Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4292 TriggerType:PUSH TriggerID:0604a77cf764e1c36abb91352c9ce9410d7e883a
   Hash:0604a77cf764e1c36abb91352c9ce9410d7e883a Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/144140062 TriggerType:PUSH TriggerID:0604a77cf764e1c36abb91352c9ce9410d7e883a
   -->
   ## CI report:
   
   * 6bfb6d18af5fdfbddadf6da53adab78567d64aa9 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143008336) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4082) 
   * d61cca153ada8faf1ca0c59e9cb8ec8bd55ed39e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/143046561) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4087) 
   * 62027cd3e2e3237eebb24fb2ded311feb0ded645 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/144075826) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4274) 
   * 0604a77cf764e1c36abb91352c9ce9410d7e883a Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/144140062) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4292) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365321100
 
 

 ##########
 File path: docs/dev/connectors/kafka.zh.md
 ##########
 @@ -161,22 +151,18 @@ Flink 的 Kafka consumer 称为 `FlinkKafkaConsumer08`(或适用于 Kafka 0.9.
 {% highlight java %}
 Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
-// 仅 Kafka 0.8 需要
-properties.setProperty("zookeeper.connect", "localhost:2181");
 properties.setProperty("group.id", "test");
 DataStream<String> stream = env
-  .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
+  .addSource(new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), properties));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
-// 仅 Kafka 0.8 需要
-properties.setProperty("zookeeper.connect", "localhost:2181")
 properties.setProperty("group.id", "test")
 stream = env
-    .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
+    .addSource(new FlinkKafkaConsumer09[String]("topic", new SimpleStringSchema(), properties))
 
 Review comment:
   `docs/dev/datastream_api.md` and `docs/dev/datastream_api.zh.md` still contain a reference to `FlinkKafkaConsumer08`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10762: [FLINK-15115][kafka] Drop Kafka 0.8/0.9
URL: https://github.com/apache/flink/pull/10762#discussion_r365323005
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ##########
 @@ -38,7 +38,6 @@
 public class KafkaValidator extends ConnectorDescriptorValidator {
 
 	public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
-	public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
 
 Review comment:
   `KafkaShortRetentionTestBase.java` contains in line 253 Kafka 0.8 specific code. In line 254 the same class contains Kafka 0.9 specific code. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services