You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by radekg <gi...@git.apache.org> on 2016/07/12 19:28:18 UTC

[GitHub] flink pull request #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to ...

GitHub user radekg opened a pull request:

    https://github.com/apache/flink/pull/2231

    [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

    Hi everyone,
    
    At The Weather Company we bumped into a problem while trying to use Flink with Kafka 0.10.x. This PR introduces the support for `FlinkKafkaConsumer010` and `FlinkKafkaProducer010`. Unit test coverage is provided and `mvn clean verify` passes.
    
    The output is below:
    
    ```
    [INFO] ------------------------------------------------------------------------
    [INFO] Reactor Summary:
    [INFO]
    [INFO] force-shading ...................................... SUCCESS [  1.311 s]
    [INFO] flink .............................................. SUCCESS [  2.939 s]
    [INFO] flink-annotations .................................. SUCCESS [  1.476 s]
    [INFO] flink-shaded-hadoop ................................ SUCCESS [  0.152 s]
    [INFO] flink-shaded-hadoop2 ............................... SUCCESS [  7.065 s]
    [INFO] flink-shaded-include-yarn-tests .................... SUCCESS [  8.543 s]
    [INFO] flink-shaded-curator ............................... SUCCESS [  0.112 s]
    [INFO] flink-shaded-curator-recipes ....................... SUCCESS [  1.080 s]
    [INFO] flink-shaded-curator-test .......................... SUCCESS [  0.210 s]
    [INFO] flink-test-utils-parent ............................ SUCCESS [  0.126 s]
    [INFO] flink-test-utils-junit ............................. SUCCESS [  2.019 s]
    [INFO] flink-core ......................................... SUCCESS [ 34.501 s]
    [INFO] flink-java ......................................... SUCCESS [ 26.266 s]
    [INFO] flink-runtime ...................................... SUCCESS [04:57 min]
    [INFO] flink-optimizer .................................... SUCCESS [  7.914 s]
    [INFO] flink-clients ...................................... SUCCESS [  6.537 s]
    [INFO] flink-streaming-java ............................... SUCCESS [ 37.732 s]
    [INFO] flink-test-utils ................................... SUCCESS [  6.166 s]
    [INFO] flink-scala ........................................ SUCCESS [ 24.626 s]
    [INFO] flink-runtime-web .................................. SUCCESS [ 12.831 s]
    [INFO] flink-examples ..................................... SUCCESS [  1.123 s]
    [INFO] flink-examples-batch ............................... SUCCESS [ 11.919 s]
    [INFO] flink-contrib ...................................... SUCCESS [  0.096 s]
    [INFO] flink-statebackend-rocksdb ......................... SUCCESS [  7.770 s]
    [INFO] flink-tests ........................................ SUCCESS [06:22 min]
    [INFO] flink-streaming-scala .............................. SUCCESS [ 26.831 s]
    [INFO] flink-streaming-connectors ......................... SUCCESS [  0.100 s]
    [INFO] flink-connector-flume .............................. SUCCESS [  2.425 s]
    [INFO] flink-libraries .................................... SUCCESS [  0.084 s]
    [INFO] flink-table ........................................ SUCCESS [02:02 min]
    [INFO] flink-connector-kafka-base ......................... SUCCESS [  4.604 s]
    [INFO] flink-connector-kafka-0.8 .......................... SUCCESS [02:01 min]
    [INFO] flink-connector-kafka-0.9 .......................... SUCCESS [02:38 min]
    [INFO] flink-connector-kafka-0.10 ......................... SUCCESS [02:04 min]
    [INFO] flink-connector-elasticsearch ...................... SUCCESS [ 19.310 s]
    [INFO] flink-connector-elasticsearch2 ..................... SUCCESS [ 17.086 s]
    [INFO] flink-connector-rabbitmq ........................... SUCCESS [  2.885 s]
    [INFO] flink-connector-twitter ............................ SUCCESS [  2.649 s]
    [INFO] flink-connector-nifi ............................... SUCCESS [  1.339 s]
    [INFO] flink-connector-cassandra .......................... SUCCESS [01:21 min]
    [INFO] flink-connector-redis .............................. SUCCESS [  5.738 s]
    [INFO] flink-connector-filesystem ......................... SUCCESS [ 24.871 s]
    [INFO] flink-batch-connectors ............................. SUCCESS [  0.103 s]
    [INFO] flink-avro ......................................... SUCCESS [  9.788 s]
    [INFO] flink-jdbc ......................................... SUCCESS [  4.839 s]
    [INFO] flink-hadoop-compatibility ......................... SUCCESS [ 10.026 s]
    [INFO] flink-hbase ........................................ SUCCESS [  2.938 s]
    [INFO] flink-hcatalog ..................................... SUCCESS [  5.383 s]
    [INFO] flink-examples-streaming ........................... SUCCESS [ 24.339 s]
    [INFO] flink-gelly ........................................ SUCCESS [ 40.416 s]
    [INFO] flink-gelly-scala .................................. SUCCESS [ 29.050 s]
    [INFO] flink-gelly-examples ............................... SUCCESS [ 21.418 s]
    [INFO] flink-python ....................................... SUCCESS [ 58.016 s]
    [INFO] flink-ml ........................................... SUCCESS [ 57.270 s]
    [INFO] flink-cep .......................................... SUCCESS [  6.984 s]
    [INFO] flink-cep-scala .................................... SUCCESS [  8.521 s]
    [INFO] flink-scala-shell .................................. SUCCESS [03:28 min]
    [INFO] flink-quickstart ................................... SUCCESS [  1.248 s]
    [INFO] flink-quickstart-java .............................. SUCCESS [  0.610 s]
    [INFO] flink-quickstart-scala ............................. SUCCESS [  0.237 s]
    [INFO] flink-storm ........................................ SUCCESS [ 14.975 s]
    [INFO] flink-storm-examples ............................... SUCCESS [ 37.513 s]
    [INFO] flink-streaming-contrib ............................ SUCCESS [  8.452 s]
    [INFO] flink-tweet-inputformat ............................ SUCCESS [  3.075 s]
    [INFO] flink-operator-stats ............................... SUCCESS [  6.521 s]
    [INFO] flink-connector-wikiedits .......................... SUCCESS [ 18.022 s]
    [INFO] flink-yarn ......................................... SUCCESS [  7.539 s]
    [INFO] flink-dist ......................................... SUCCESS [ 11.453 s]
    [INFO] flink-metrics ...................................... SUCCESS [  0.101 s]
    [INFO] flink-metrics-dropwizard ........................... SUCCESS [  2.699 s]
    [INFO] flink-metrics-ganglia .............................. SUCCESS [  1.320 s]
    [INFO] flink-metrics-graphite ............................. SUCCESS [  1.188 s]
    [INFO] flink-metrics-statsd ............................... SUCCESS [  2.271 s]
    [INFO] flink-fs-tests ..................................... SUCCESS [ 27.916 s]
    [INFO] flink-java8 ........................................ SUCCESS [ 12.209 s]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 37:24 min
    [INFO] Finished at: 2016-07-12T15:18:39-04:00
    [INFO] Final Memory: 234M/1833M
    [INFO] ------------------------------------------------------------------------
    ```
    
    The only thing not provided right in this moment in time, is the documentation updates. Not sure how to take on that one, some guidance would be appreciated.
    
    What would be the best way to proceed with the contribution?

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/TheWeatherCompany/flink kafka-0.10

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2231.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2231
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    @radekg Thank you for the quick fix. I hope to find time over the weekend to test + review this, if not than early next week :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2231


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by radekg <gi...@git.apache.org>.
Github user radekg commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    @tzulitai yes, this pr does not deal with 0.10 specific timestamps. It makes a simple consumer application work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2231#discussion_r73646737
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---
    @@ -0,0 +1,331 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import kafka.admin.AdminUtils;
    +import kafka.common.KafkaException;
    +import kafka.network.SocketServer;
    +import kafka.server.KafkaConfig;
    +import kafka.server.KafkaServer;
    +import kafka.utils.SystemTime$;
    +import kafka.utils.ZkUtils;
    +import org.I0Itec.zkclient.ZkClient;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    +import org.apache.flink.util.NetUtils;
    +import org.apache.kafka.common.protocol.SecurityProtocol;
    +import org.apache.kafka.common.requests.MetadataResponse;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.File;
    +import java.net.BindException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.UUID;
    +
    +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * An implementation of the KafkaServerProvider for Kafka 0.10
    + */
    +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
    +	private File tmpZkDir;
    +	private File tmpKafkaParent;
    +	private List<File> tmpKafkaDirs;
    +	private List<KafkaServer> brokers;
    +	private TestingServer zookeeper;
    +	private String zookeeperConnectionString;
    +	private String brokerConnectionString = "";
    +	private Properties standardProps;
    +	private Properties additionalServerProperties;
    +
    +	public String getBrokerConnectionString() {
    +		return brokerConnectionString;
    +	}
    +
    +	@Override
    +	public Properties getStandardProperties() {
    +		return standardProps;
    +	}
    +
    +	@Override
    +	public String getVersion() {
    +		return "0.10";
    +	}
    +
    +	@Override
    +	public List<KafkaServer> getBrokers() {
    +		return brokers;
    +	}
    +
    +	@Override
    +	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
    +		return new FlinkKafkaConsumer010<>(topics, readSchema, props);
    +	}
    +
    +	@Override
    +	public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
    +		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
    +		prod.setFlushOnCheckpoint(true);
    +		return prod;
    +	}
    +
    +	@Override
    +	public void restartBroker(int leaderId) throws Exception {
    +		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
    +	}
    +
    +	@Override
    +	public int getLeaderToShutDown(String topic) throws Exception {
    +		ZkUtils zkUtils = getZkUtils();
    +		try {
    +			MetadataResponse.PartitionMetadata firstPart = null;
    +			do {
    +				if (firstPart != null) {
    +					LOG.info("Unable to find leader. error code {}", firstPart.error().code());
    +					// not the first try. Sleep a bit
    +					Thread.sleep(150);
    +				}
    +
    +				List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata();
    +				firstPart = partitionMetadata.get(0);
    +			}
    +			while (firstPart.error().code() != 0);
    +
    +			return firstPart.leader().id();
    +		} finally {
    +			zkUtils.close();
    +		}
    +	}
    +
    +	@Override
    +	public int getBrokerId(KafkaServer server) {
    +		return server.config().brokerId();
    +	}
    +
    +	@Override
    +	public void prepare(int numKafkaServers, Properties additionalServerProperties) {
    +		this.additionalServerProperties = additionalServerProperties;
    +		File tempDir = new File(System.getProperty("java.io.tmpdir"));
    +
    +		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
    +		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
    +
    +		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
    +		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
    +
    +		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
    +		for (int i = 0; i < numKafkaServers; i++) {
    +			File tmpDir = new File(tmpKafkaParent, "server-" + i);
    +			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
    +			tmpKafkaDirs.add(tmpDir);
    +		}
    +
    +		zookeeper = null;
    +		brokers = null;
    +
    +		try {
    +			LOG.info("Starting Zookeeper");
    +			zookeeper = new TestingServer(-1, tmpZkDir);
    +			zookeeperConnectionString = zookeeper.getConnectString();
    +
    +			LOG.info("Starting KafkaServer");
    +			brokers = new ArrayList<>(numKafkaServers);
    +
    +			for (int i = 0; i < numKafkaServers; i++) {
    +				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
    +
    +				SocketServer socketServer = brokers.get(i).socketServer();
    +				brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
    +			}
    +
    +			LOG.info("ZK and KafkaServer started.");
    +		}
    +		catch (Throwable t) {
    +			t.printStackTrace();
    +			fail("Test setup failed: " + t.getMessage());
    +		}
    +
    +		standardProps = new Properties();
    +		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
    +		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
    +		standardProps.setProperty("group.id", "flink-tests");
    +		standardProps.setProperty("auto.commit.enable", "false");
    +		standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis.
    +		standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
    +		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value)
    +		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
    +	}
    +
    +	@Override
    +	public void shutdown() {
    +		for (KafkaServer broker : brokers) {
    +			if (broker != null) {
    +				broker.shutdown();
    +			}
    +		}
    +		brokers.clear();
    +
    +		if (zookeeper != null) {
    +			try {
    +				zookeeper.stop();
    +			}
    +			catch (Exception e) {
    +				LOG.warn("ZK.stop() failed", e);
    +			}
    +			zookeeper = null;
    +		}
    +
    +		// clean up the temp spaces
    +
    +		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
    +			try {
    +				FileUtils.deleteDirectory(tmpKafkaParent);
    +			}
    +			catch (Exception e) {
    +				// ignore
    +			}
    +		}
    +		if (tmpZkDir != null && tmpZkDir.exists()) {
    +			try {
    +				FileUtils.deleteDirectory(tmpZkDir);
    +			}
    +			catch (Exception e) {
    +				// ignore
    +			}
    +		}
    +	}
    +
    +	public ZkUtils getZkUtils() {
    +		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
    +				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
    +		return ZkUtils.apply(creator, false);
    +	}
    +
    +	@Override
    +	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
    +		// create topic with one client
    +		LOG.info("Creating topic {}", topic);
    +
    +		ZkUtils zkUtils = getZkUtils();
    +		try {
    +			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, new kafka.admin.RackAwareMode.Enforced$());
    --- End diff --
    
    The proper usage of `RackAwareMode` here seems to be `kafka.admin.RackAwareMode.Enforced$.MODULE$` (this is how tests in Kafka use this). IntelliJ complains that `new kafka.admin.RackAwareMode.Enforced$()` has private access, I'm not sure why the build is passing on this though ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2231#discussion_r73662520
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml ---
    @@ -0,0 +1,179 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    +
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<parent>
    +		<groupId>org.apache.flink</groupId>
    +		<artifactId>flink-streaming-connectors</artifactId>
    +		<version>1.1-SNAPSHOT</version>
    --- End diff --
    
    We've recently just bumped version to 1.2-SNAPSHOT.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    I agree with @tzulitai that it would be nice if you could have minimum code duplication in the long-run but it might not be possible with the current design of the consumers.
    
    What about the new timestamps that were introduced in Kafka 0.10? This is also something that wouldn't work with the 0.9 consumer and could only be implemented for the 0.10-specific consumer, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Hi @radekg , 
    There was a recent change to the connector code to how the Kafka metrics are reported, so right now the PR has conflicts and can't be built. Would you like to rebase this PR on the current master branch so we can start testing it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    @aljoscha Ah right, for the new timestamps we will definitely need a 0.10-specific consumer. I think it makes sense to include a new module then.
    
    This current PR does not add the new 0.10 timestamps ( @radekg correct me if I'm wrong here ), but I think we can add this as a separate follow up JIRA / PR afterwards, because it'll probably require changing some code in the `flink-connector-kafka-base` and the user-facing deserialization schemas that will need more discussion.
    
    I'll find time this week to give this PR a test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by radekg <gi...@git.apache.org>.
Github user radekg commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Merged with `upstream/master` and I'm getting this when running `mvn clean verify`:
    
    ```
    [INFO] -------------------------------------------------------------
    [ERROR] COMPILATION ERROR :
    [INFO] -------------------------------------------------------------
    [ERROR] /Users/rad/dev/twc/flink/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java:[30,69] cannot find symbol
      symbol:   class DefaultKafkaMetricAccumulator
      location: package org.apache.flink.streaming.connectors.kafka.internals.metrics
    [ERROR] /Users/rad/dev/twc/flink/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java:[105,17] constructor AbstractFetcher in class org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher<T,KPH> cannot be applied to given types;
      required: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<T>,java.util.List<org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition>,org.apache.flink.util.SerializedValue<org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks<T>>,org.apache.flink.util.SerializedValue<org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks<T>>,org.apache.flink.streaming.api.operators.StreamingRuntimeContext,boolean
      found: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<T>,java.util.List<org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition>,org.apache.flink.util.SerializedValue<org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks<T>>,org.apache.flink.util.SerializedValue<org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks<T>>,org.apache.flink.streaming.api.operators.StreamingRuntimeContext
      reason: actual and formal argument lists differ in length
    [ERROR] /Users/rad/dev/twc/flink/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java:[192,49] cannot find symbol
      symbol:   class DefaultKafkaMetricAccumulator
      location: class org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher<T>
    [ERROR] /Users/rad/dev/twc/flink/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java:[193,65] cannot find symbol
      symbol:   variable DefaultKafkaMetricAccumulator
      location: class org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher<T>
    [INFO] 4 errors
    [INFO] -------------------------------------------------------------
    [INFO] ------------------------------------------------------------------------
    [INFO] Reactor Summary:
    [INFO]
    [INFO] force-shading ...................................... SUCCESS [  1.210 s]
    [INFO] flink .............................................. SUCCESS [  4.416 s]
    [INFO] flink-annotations .................................. SUCCESS [  1.551 s]
    [INFO] flink-shaded-hadoop ................................ SUCCESS [  0.162 s]
    [INFO] flink-shaded-hadoop2 ............................... SUCCESS [  6.451 s]
    [INFO] flink-shaded-include-yarn-tests .................... SUCCESS [  7.929 s]
    [INFO] flink-shaded-curator ............................... SUCCESS [  0.110 s]
    [INFO] flink-shaded-curator-recipes ....................... SUCCESS [  0.986 s]
    [INFO] flink-shaded-curator-test .......................... SUCCESS [  0.200 s]
    [INFO] flink-test-utils-parent ............................ SUCCESS [  0.111 s]
    [INFO] flink-test-utils-junit ............................. SUCCESS [  2.417 s]
    [INFO] flink-core ......................................... SUCCESS [ 37.825 s]
    [INFO] flink-java ......................................... SUCCESS [ 23.620 s]
    [INFO] flink-runtime ...................................... SUCCESS [06:25 min]
    [INFO] flink-optimizer .................................... SUCCESS [ 12.698 s]
    [INFO] flink-clients ...................................... SUCCESS [  9.795 s]
    [INFO] flink-streaming-java ............................... SUCCESS [ 43.709 s]
    [INFO] flink-test-utils ................................... SUCCESS [  9.363 s]
    [INFO] flink-scala ........................................ SUCCESS [ 37.639 s]
    [INFO] flink-runtime-web .................................. SUCCESS [ 19.749 s]
    [INFO] flink-examples ..................................... SUCCESS [  1.006 s]
    [INFO] flink-examples-batch ............................... SUCCESS [ 14.276 s]
    [INFO] flink-contrib ...................................... SUCCESS [  0.104 s]
    [INFO] flink-statebackend-rocksdb ......................... SUCCESS [ 10.938 s]
    [INFO] flink-tests ........................................ SUCCESS [07:34 min]
    [INFO] flink-streaming-scala .............................. SUCCESS [ 33.365 s]
    [INFO] flink-streaming-connectors ......................... SUCCESS [  0.106 s]
    [INFO] flink-connector-flume .............................. SUCCESS [  5.626 s]
    [INFO] flink-libraries .................................... SUCCESS [  0.100 s]
    [INFO] flink-table ........................................ SUCCESS [02:31 min]
    [INFO] flink-connector-kafka-base ......................... SUCCESS [ 10.033 s]
    [INFO] flink-connector-kafka-0.8 .......................... SUCCESS [02:06 min]
    [INFO] flink-connector-kafka-0.9 .......................... SUCCESS [02:12 min]
    [INFO] flink-connector-kafka-0.10 ......................... FAILURE [  0.197 s]
    [INFO] flink-connector-elasticsearch ...................... SKIPPED
    [INFO] flink-connector-elasticsearch2 ..................... SKIPPED
    [INFO] flink-connector-rabbitmq ........................... SKIPPED
    [INFO] flink-connector-twitter ............................ SKIPPED
    [INFO] flink-connector-nifi ............................... SKIPPED
    [INFO] flink-connector-cassandra .......................... SKIPPED
    [INFO] flink-connector-redis .............................. SKIPPED
    [INFO] flink-connector-filesystem ......................... SKIPPED
    [INFO] flink-batch-connectors ............................. SKIPPED
    [INFO] flink-avro ......................................... SKIPPED
    [INFO] flink-jdbc ......................................... SKIPPED
    [INFO] flink-hadoop-compatibility ......................... SKIPPED
    [INFO] flink-hbase ........................................ SKIPPED
    [INFO] flink-hcatalog ..................................... SKIPPED
    [INFO] flink-examples-streaming ........................... SKIPPED
    [INFO] flink-gelly ........................................ SKIPPED
    [INFO] flink-gelly-scala .................................. SKIPPED
    [INFO] flink-gelly-examples ............................... SKIPPED
    [INFO] flink-python ....................................... SKIPPED
    [INFO] flink-ml ........................................... SKIPPED
    [INFO] flink-cep .......................................... SKIPPED
    [INFO] flink-cep-scala .................................... SKIPPED
    [INFO] flink-scala-shell .................................. SKIPPED
    [INFO] flink-quickstart ................................... SKIPPED
    [INFO] flink-quickstart-java .............................. SKIPPED
    [INFO] flink-quickstart-scala ............................. SKIPPED
    [INFO] flink-storm ........................................ SKIPPED
    [INFO] flink-storm-examples ............................... SKIPPED
    [INFO] flink-streaming-contrib ............................ SKIPPED
    [INFO] flink-tweet-inputformat ............................ SKIPPED
    [INFO] flink-operator-stats ............................... SKIPPED
    [INFO] flink-connector-wikiedits .......................... SKIPPED
    [INFO] flink-yarn ......................................... SKIPPED
    [INFO] flink-dist ......................................... SKIPPED
    [INFO] flink-metrics ...................................... SKIPPED
    [INFO] flink-metrics-dropwizard ........................... SKIPPED
    [INFO] flink-metrics-ganglia .............................. SKIPPED
    [INFO] flink-metrics-graphite ............................. SKIPPED
    [INFO] flink-metrics-statsd ............................... SKIPPED
    [INFO] flink-fs-tests ..................................... SKIPPED
    [INFO] flink-java8 ........................................ SKIPPED
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD FAILURE
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 25:46 min
    [INFO] Finished at: 2016-07-22T21:52:55+02:00
    [INFO] Final Memory: 159M/1763M
    [INFO] ------------------------------------------------------------------------
    [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-connector-kafka-0.10_2.10: Compilation failure: Compilation failure:
    [ERROR] /Users/rad/dev/twc/flink/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java:[30,69] cannot find symbol
    [ERROR] symbol:   class DefaultKafkaMetricAccumulator
    [ERROR] location: package org.apache.flink.streaming.connectors.kafka.internals.metrics
    [ERROR] /Users/rad/dev/twc/flink/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java:[105,17] constructor AbstractFetcher in class org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher<T,KPH> cannot be applied to given types;
    [ERROR] required: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<T>,java.util.List<org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition>,org.apache.flink.util.SerializedValue<org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks<T>>,org.apache.flink.util.SerializedValue<org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks<T>>,org.apache.flink.streaming.api.operators.StreamingRuntimeContext,boolean
    [ERROR] found: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<T>,java.util.List<org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition>,org.apache.flink.util.SerializedValue<org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks<T>>,org.apache.flink.util.SerializedValue<org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks<T>>,org.apache.flink.streaming.api.operators.StreamingRuntimeContext
    [ERROR] reason: actual and formal argument lists differ in length
    [ERROR] /Users/rad/dev/twc/flink/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java:[192,49] cannot find symbol
    [ERROR] symbol:   class DefaultKafkaMetricAccumulator
    [ERROR] location: class org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher<T>
    [ERROR] /Users/rad/dev/twc/flink/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java:[193,65] cannot find symbol
    [ERROR] symbol:   variable DefaultKafkaMetricAccumulator
    [ERROR] location: class org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher<T>
    [ERROR] -> [Help 1]
    [ERROR]
    [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
    [ERROR] Re-run Maven using the -X switch to enable full debug logging.
    [ERROR]
    [ERROR] For more information about the errors and possible solutions, please read the following articles:
    [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
    [ERROR]
    [ERROR] After correcting the problems, you can resume the build with the command
    [ERROR]   mvn <goals> -rf :flink-connector-kafka-0.10_2.10
    ```
    
    Any advice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by radekg <gi...@git.apache.org>.
Github user radekg commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Travis is going to run.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Sorry for joining this discussion late. I've been on vacation.
    I also stumbled across the code duplicates. I'll check out the code from this pull request and see if there's a good way of re-using most of the 0.9 connector code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Finished first review of the code.
    
    Let me summarize parts of the Kafka 0.10 API that requires us to have a separate module:
    
     1. `ConsumerRecord` in 0.10 has a new `ConsumerRecord#timestamp()` method to retrieve Kafka server-side timestamps. If we want to attach this timestamp to the records as the default event time in the future, we'd definitely need a separate module (0.10 timestamp feature not included in this PR).
     2. `PartitionMetaData` (used in `KafkaTestEnvironmentImpl`s) has a breaking change to the APIs for retrieving the info, so we can't simply bump the version either.
    
    Other than the above, the rest of the code is the same between (or changes are irrelevant to Kafka API changes) the 0.9 connector.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by radekg <gi...@git.apache.org>.
Github user radekg commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    @rmetzger it's absolutely fine to reuse the code. If I can help in any way, please let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    @radekg One thing that's missing is update to the docs. You can find the Kafka connector documentation at `flink/docs/apis/streaming/connectors/kafka.md`. We'lll probably only need to update the Maven dependency table to include the 0.10 connector.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Thank you for the description, @radekg .
    
    I think the problems you mentioned should be solvable by working on the 0.9 connector to be just a bit more general, then users can simply manually use the 0.10 jars. However, you also have a point on the possible confusion. IMHO, I think it is redundant to have two connector modules with almost the same code, and it doesn't also seem feasible for code maintainability to keep adding modules for new Kafka versions even if they don't have changes in the API.
    
    I think we'll need to loop in @rmetzger and @aljoscha to decide on how we can proceed with this. The solutions I currently see is to work on the 0.9 connector on the above problems so it can be compatible with the 0.10 API, and either rename the module to be `flink-connecto-kafka-0.10` (doesn't seem good because it'll be breaking user's pom's), or add information to the documentation on how to work with Kafka 0.10. Either way, in the long-run, we'll probably still need to sort out a way to better manage the connector codes in situations of new external system versions like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    @radekg I've opened a PR based on your code: https://github.com/apache/flink/issues/2369 feel free to review it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2231#discussion_r73652372
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -186,7 +190,7 @@ public void runFailOnNoBrokerTest() throws Exception {
     			stream.print();
     			see.execute("No broker test");
     		} catch(RuntimeException re) {
    -			if(kafkaServer.getVersion().equals("0.9")) {
    +			if(kafkaServer.getVersion().equals(getExpectedKafkaVersion())) {
    --- End diff --
    
    I think the original intent of this assert here was that 0.8 connector throws different exception messages than 0.9.
    So adding the `getExpectedKafkaVersion()` is a bit confusing with respect to the test intent.
    Perhaps we should remove the new `getExpectedKafkaVersion()` and simply change this to `kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by radekg <gi...@git.apache.org>.
Github user radekg commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Sure, the problems are the following:
    
    - https://github.com/apache/flink/pull/2231/commits/06936d7c5acc0897348019161c9ced4596a0a4dd#diff-aba21cf86694f3f2cd85e2e5e9b04972R305 in 0.9, `consumer.assign` (https://github.com/apache/flink/pull/2231/commits/06936d7c5acc0897348019161c9ced4596a0a4dd#diff-aba21cf86694f3f2cd85e2e5e9b04972R180) takes a `List`, in 0.10 it takes `Collection`
    - for unit tests: https://github.com/apache/flink/pull/2231/commits/06936d7c5acc0897348019161c9ced4596a0a4dd#diff-ab65f3156ed8820677f3420152b78908R130, if we use 0.9 kafka version with 0.10 client, the concrete client tests fail as they catch wrong exception type in: https://github.com/TheWeatherCompany/flink/blob/06936d7c5acc0897348019161c9ced4596a0a4dd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java#L185
    
    Silly stuff. Everything else works just fine. Fell free to reuse this stuff.
    
    FYI: I'd be confused it I was to use a class indicating 0.9 when working with 0.10, that's the reason I assembled separate module. 0.9 is done and there's no future work required, it makes sense to have 0.10. Just my opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by radekg <gi...@git.apache.org>.
Github user radekg commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Thanks, running `verify` again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2231#discussion_r73656446
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
    +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +import org.apache.flink.util.SerializedValue;
    +
    +import org.apache.kafka.clients.consumer.ConsumerConfig;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
    + * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull
    + * data from one or more Kafka partitions. 
    + * 
    + * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
    + * during a failure, and that the computation processes elements "exactly once". 
    + * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
    + *
    + * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
    + * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
    + * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
    + * has consumed a topic.</p>
    + *
    + * <p>Please refer to Kafka's documentation for the available configuration properties:
    + * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
    + *
    + * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
    + * is constructed. That means that the client that submits the program needs to be able to
    + * reach the Kafka brokers or ZooKeeper.</p>
    + */
    +public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
    +
    +	private static final long serialVersionUID = 2324564345203409112L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer010.class);
    +
    +	/**  Configuration key to change the polling timeout **/
    +	public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
    +
    +	/** Boolean configuration key to disable metrics tracking **/
    +	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    --- End diff --
    
    This is redundant. It's already declared in the `FlinkKafkaConsumerBase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Sorry for the delay on the review. Reviewing + testing now ...
    
    @radekg Yup the failure is unrelated to the changes here. Some of the tests are currently a bit flaky.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Hi @radekg , thank you for opening a PR for this!
    From a first look it seems that there isn't much changes to the code of `flink-connector-kafka-0.9` and this PR. Also, from the original discussion / comments in the JIRA, the Kafka API doesn't seem to have changed between 0.9 and 0.10, so it might be possible to let the Kafka 0.9 connector use the 0.10 client by putting the Kafka 0.10 dependency into the user pom.
    
    May I ask whether you have tried this approach out already? Also,
    > At The Weather Company we bumped into a problem while trying to use Flink with Kafka 0.10.x.
    What was the problem? If you can describe, it'll be helpful for deciding how we can proceed with this :) There's another contributor who was trying this out, I'll also try to ask for his feedback on this in the JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    The errors are due to some of the changes to `AbstractFetcher` in https://github.com/apache/flink/commit/41f58182289226850b23c61a32f01223485d4775. Some of the Kafka 0.9 connector code that has changed accordingly, so you'll probably need to reflect those changes in the Kafka 0.10 code too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    @radekg I've added Kafka 0.10 support to Flink, that's why I closed this pull request. My change preserved your commit from this pull request. Thank you for the contribution!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by radekg <gi...@git.apache.org>.
Github user radekg commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Tests are failing for random setups on travis. Seems to be something scala related.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    @radekg, are you okay with me using your pull request as a base for adding Kafka 0.10 to Flink?
    I've started changing your code from the PR so that we don't need to copy so much code: https://github.com/rmetzger/flink/commits/kafka-0.10


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2231#discussion_r73648473
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -134,6 +134,10 @@
     	@Rule
     	public RetryRule retryRule = new RetryRule();
     
    +	public String getExpectedKafkaVersion() {
    +		return "0.9";
    +	}
    --- End diff --
    
    We should make this an abstract method in the abstract test base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2231: [FLINK-4035] Bump Kafka producer in Kafka sink to Kafka 0...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2231
  
    Okay, cool. Thank you. I'll probably open a pull request with your and my changes. I'll let you know so that you can help reviewing it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---