You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Boyang Jerry Peng (JIRA)" <ji...@apache.org> on 2015/08/27 16:56:46 UTC

[jira] [Updated] (FLINK-2585) KafkaSource not working

     [ https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Boyang Jerry Peng updated FLINK-2585:
-------------------------------------
    Description: 
I tried running the KafkaConsumerExample with that is subscribing to a command line producer of kafka but the KafkaConsumerExample topology was not receiving any data from Kafka.  Then I wrote my own topology that uses Kafka as a source but it didn't work as well.  The topologies would run but receive not data.  Can someone help me with this problem?  

Kafka console producer I am running:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

The flink code I am running:

public class KafkaDataProcessor {
	private static int port;
	private static String hostname;
	private static String topic;
	private static final Logger LOG = LoggerFactory.getLogger(KafkaDataProcessor.class);

	public static void main(String[] args) {
		if (!parseParameters(args)) {
			return;
		}
		System.out.println("Start listening for data on: " + hostname + ":" + port + " for topic: " + topic);
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


		DataStream<Tuple2<String, Integer>> dataStream = env
						.addSource(new KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
										.flatMap(new Splitter()).setParallelism(2)
										.groupBy(0)
										.sum(1).setParallelism(2);


		dataStream.print().setParallelism(2);



		try {
			env.execute("kafka processor");
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
		@Override
		public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
			for (String word : sentence.split(" ")) {
				System.out.println("word: " + word);
				LOG.info("word: {}", word);
				out.collect(new Tuple2<String, Integer>(word, 1));
			}
		}
	}

	private static boolean parseParameters(String[] args) {

		if (args.length > 0) {
			if (args.length == 3) {
				hostname = args[0];
				port = Integer.parseInt(args[1]);
				topic = args[2];
			} else {
				System.err.println("Usage: KafkaDataProcessor <hostname> <Port> <topic>");
				return false;
			}
		} else {
			System.out.println("Executing KafkaDataProcessor example with built-in default data.");
			System.out.println("  Provide Hostname and Port to read input data from.");
			System.out.println("  Usage: KafkaDataProcessor <Hostname> <Port> <topic>");
			return false;
		}
		return true;
	}
}

  was:
I tried running the KafkaConsumerExample with that is subscribing to a command line producer of kafka but the KafkaConsumerExample topology was not receiving any data from Kafka.  Then I wrote my own topology that uses Kafka as a source but it didn't work as well.  The topologies would run but receive not data.  Can someone help me with this problem?  The code I am running:




> KafkaSource not working
> -----------------------
>
>                 Key: FLINK-2585
>                 URL: https://issues.apache.org/jira/browse/FLINK-2585
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Boyang Jerry Peng
>
> I tried running the KafkaConsumerExample with that is subscribing to a command line producer of kafka but the KafkaConsumerExample topology was not receiving any data from Kafka.  Then I wrote my own topology that uses Kafka as a source but it didn't work as well.  The topologies would run but receive not data.  Can someone help me with this problem?  
> Kafka console producer I am running:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> The flink code I am running:
> public class KafkaDataProcessor {
> 	private static int port;
> 	private static String hostname;
> 	private static String topic;
> 	private static final Logger LOG = LoggerFactory.getLogger(KafkaDataProcessor.class);
> 	public static void main(String[] args) {
> 		if (!parseParameters(args)) {
> 			return;
> 		}
> 		System.out.println("Start listening for data on: " + hostname + ":" + port + " for topic: " + topic);
> 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		DataStream<Tuple2<String, Integer>> dataStream = env
> 						.addSource(new KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
> 										.flatMap(new Splitter()).setParallelism(2)
> 										.groupBy(0)
> 										.sum(1).setParallelism(2);
> 		dataStream.print().setParallelism(2);
> 		try {
> 			env.execute("kafka processor");
> 		} catch (Exception e) {
> 			e.printStackTrace();
> 		}
> 	}
> 	public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
> 		@Override
> 		public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
> 			for (String word : sentence.split(" ")) {
> 				System.out.println("word: " + word);
> 				LOG.info("word: {}", word);
> 				out.collect(new Tuple2<String, Integer>(word, 1));
> 			}
> 		}
> 	}
> 	private static boolean parseParameters(String[] args) {
> 		if (args.length > 0) {
> 			if (args.length == 3) {
> 				hostname = args[0];
> 				port = Integer.parseInt(args[1]);
> 				topic = args[2];
> 			} else {
> 				System.err.println("Usage: KafkaDataProcessor <hostname> <Port> <topic>");
> 				return false;
> 			}
> 		} else {
> 			System.out.println("Executing KafkaDataProcessor example with built-in default data.");
> 			System.out.println("  Provide Hostname and Port to read input data from.");
> 			System.out.println("  Usage: KafkaDataProcessor <Hostname> <Port> <topic>");
> 			return false;
> 		}
> 		return true;
> 	}
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)