You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Boyang Jerry Peng (JIRA)" <ji...@apache.org> on 2015/08/27 16:56:46 UTC
[jira] [Updated] (FLINK-2585) KafkaSource not working
[ https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Jerry Peng updated FLINK-2585:
-------------------------------------
Description:
I tried running the KafkaConsumerExample with that is subscribing to a command line producer of kafka but the KafkaConsumerExample topology was not receiving any data from Kafka. Then I wrote my own topology that uses Kafka as a source but it didn't work as well. The topologies would run but receive not data. Can someone help me with this problem?
Kafka console producer I am running:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
The flink code I am running:
public class KafkaDataProcessor {
private static int port;
private static String hostname;
private static String topic;
private static final Logger LOG = LoggerFactory.getLogger(KafkaDataProcessor.class);
public static void main(String[] args) {
if (!parseParameters(args)) {
return;
}
System.out.println("Start listening for data on: " + hostname + ":" + port + " for topic: " + topic);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.addSource(new KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
.flatMap(new Splitter()).setParallelism(2)
.groupBy(0)
.sum(1).setParallelism(2);
dataStream.print().setParallelism(2);
try {
env.execute("kafka processor");
} catch (Exception e) {
e.printStackTrace();
}
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
System.out.println("word: " + word);
LOG.info("word: {}", word);
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
private static boolean parseParameters(String[] args) {
if (args.length > 0) {
if (args.length == 3) {
hostname = args[0];
port = Integer.parseInt(args[1]);
topic = args[2];
} else {
System.err.println("Usage: KafkaDataProcessor <hostname> <Port> <topic>");
return false;
}
} else {
System.out.println("Executing KafkaDataProcessor example with built-in default data.");
System.out.println(" Provide Hostname and Port to read input data from.");
System.out.println(" Usage: KafkaDataProcessor <Hostname> <Port> <topic>");
return false;
}
return true;
}
}
was:
I tried running the KafkaConsumerExample with that is subscribing to a command line producer of kafka but the KafkaConsumerExample topology was not receiving any data from Kafka. Then I wrote my own topology that uses Kafka as a source but it didn't work as well. The topologies would run but receive not data. Can someone help me with this problem? The code I am running:
> KafkaSource not working
> -----------------------
>
> Key: FLINK-2585
> URL: https://issues.apache.org/jira/browse/FLINK-2585
> Project: Flink
> Issue Type: Bug
> Reporter: Boyang Jerry Peng
>
> I tried running the KafkaConsumerExample with that is subscribing to a command line producer of kafka but the KafkaConsumerExample topology was not receiving any data from Kafka. Then I wrote my own topology that uses Kafka as a source but it didn't work as well. The topologies would run but receive not data. Can someone help me with this problem?
> Kafka console producer I am running:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> The flink code I am running:
> public class KafkaDataProcessor {
> private static int port;
> private static String hostname;
> private static String topic;
> private static final Logger LOG = LoggerFactory.getLogger(KafkaDataProcessor.class);
> public static void main(String[] args) {
> if (!parseParameters(args)) {
> return;
> }
> System.out.println("Start listening for data on: " + hostname + ":" + port + " for topic: " + topic);
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Tuple2<String, Integer>> dataStream = env
> .addSource(new KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
> .flatMap(new Splitter()).setParallelism(2)
> .groupBy(0)
> .sum(1).setParallelism(2);
> dataStream.print().setParallelism(2);
> try {
> env.execute("kafka processor");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
> @Override
> public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
> for (String word : sentence.split(" ")) {
> System.out.println("word: " + word);
> LOG.info("word: {}", word);
> out.collect(new Tuple2<String, Integer>(word, 1));
> }
> }
> }
> private static boolean parseParameters(String[] args) {
> if (args.length > 0) {
> if (args.length == 3) {
> hostname = args[0];
> port = Integer.parseInt(args[1]);
> topic = args[2];
> } else {
> System.err.println("Usage: KafkaDataProcessor <hostname> <Port> <topic>");
> return false;
> }
> } else {
> System.out.println("Executing KafkaDataProcessor example with built-in default data.");
> System.out.println(" Provide Hostname and Port to read input data from.");
> System.out.println(" Usage: KafkaDataProcessor <Hostname> <Port> <topic>");
> return false;
> }
> return true;
> }
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)