You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shankara <sh...@gmail.com> on 2017/10/12 13:49:21 UTC

Beam Application run on cluster setup (Kafka+Flink)

Below is my setup 
        1. Kafka zookeeper and server in one machine (192.168.1.116) and
producer (192.168.1.100) and consumer (192.168.1.117) in another machine.  
--> This work fine no issue 
        2. Running standalone beam application with kafka consumer --> This
work fine
        3. Running beam application in flink cluster with kafka consumer -->
This is not working
  Not receiving message from kafka producer.

Same program works fine with standalone with flink runner.
Below is my code snippet.

public static void main(String[] args) {
    Pipeline p = initializePipeline(args);
    Map<String, List&lt;String>> intelliOmIms = new TreeMap<>();

    PTransform<PBegin, PCollection&lt;KV&lt;Integer, byte[]>>> reader;
    reader = KafkaIO.<Integer, byte[]>read()
            .withBootstrapServers("192.168.1.116:9092")    --->Kafka
zookeeper and server running
            .withTopic("kafkatest")
            .withKeyDeserializer(IntegerDeserializer.class)
            .withValueDeserializer(IntelliOmImsKpiDataUtil.class)
            .withoutMetadata();

    PCollection<KV&lt;Integer, byte[]>> output = p.apply(reader);
    output.apply(ParDo.of(new PrintMsg()));

    p.run().waitUntilFinish();
}

  In IntelliOmImsKpiDataUtil deserializer I am just printing message saying
that kafka is received the message.

public static class PrintMsg extends DoFn<KV&lt;Integer, byte[]>, Void> {

    @ProcessElement
    public void processElement(ProcessContext c) {
        System.out.println("Received Message .... from kafkatest Topic ");
    }
}

  Started Zookeeper in 192.168.1.116 like below :
    bin/zookeeper-server-start.sh config/zookeeper.properties
  
  Started Server in 192.168.1.116 like below :
    bin/kafka-server-start.sh config/server.properties
  
  Started Producer in 192.168.1.100 like below :
    bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 --topic
kafkatest

  Started Consumer in 192.168.1.117 like below :
    bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 --topic
kafkatest --from-beginning

   With standalone beam application kafka can receive the message, But in
cluster setup it is not working.

Can you please help me to check it. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Beam Application run on cluster setup (Kafka+Flink)

Posted by Shankara <sh...@gmail.com>.
Hi Piotrek,

    I was checking in Job manager machine logs, and dashboard. But actually
output string was recorded in taskmanager macine log file. I added InfluxDB
and verified, Received data is writing into influxDB.

   Thank you very much for your support.


Thanks,
Shankara



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Beam Application run on cluster setup (Kafka+Flink)

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

What version of Flink are you using. In earlier 1.3.x releases there were some bugs in Kafka Consumer code.
Could you change the log level in Flink to debug? 
Did you check the Kafka logs for some hint maybe?
I guess that metrics like bytes read/input records of this Link application are not changing?

Piotrek

> On 13 Oct 2017, at 07:51, Shankara <sh...@gmail.com> wrote:
> 
> Hi,
> 
>    I mean same code works fine in flink local setup. I can able to see
> "Received Message .... from testkafka Topic : " on console when kafka
> receive some message (Kafka Producer is in other machine and sending some
> message frequently to testkafka topic).
> 
>     *Submitted the Beam application to flink local by below command :*
>         mvn compile exec:java
> -Dexec.mainClass=org.apache.beam.influxdb.KafkaRead  -Pflink-runner
> 
>     *Output is :*
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#735957608]
> with leader session id d97c060d-bdf9-4215-8d7c-138f13cbff1e.
> 10/13/2017 11:09:09	Job execution switched to status RUNNING.
> 10/13/2017 11:09:09	Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
> 10/13/2017 11:09:09	Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
> 10/13/2017 11:09:09	Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
> *Received in Deserilize..
> Received Message .... from testkafka Topic : HELLOASA*
> 
> 
> 
>    If I run same code in Flink Cluster I cannot see any message in
> log/stdout, But job is continuously running and Kafka Producer is in other
> machine and sending some message frequently to testkafka topic.
> 
>  * I started flink cluster by below command : *
>       bin/start-cluster.sh
> 
>   *Submitted the Beam application to flink cluster by below command :*
>      bin/flink run -c org.apache.beam.influxdb.KafkaRead
> /home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
> --runner=FlinkRunner --flinkMaster=192.168.1.116
> --filesToStage=/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar    
> 
>   In dashboad :
> 
> 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/DashBoard.png> 
> 
> 
> 
>    I cannot see any message in dashboard :
> 
> 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/Stdout.png> 
> 
> 
>   As per log Job execution is running :
> Cluster configuration: Standalone cluster with JobManager at
> /192.168.1.116:6123
> Using address 192.168.1.116:6123 to connect to JobManager.
> JobManager web interface address http://192.168.1.116:8081
> Starting execution of program
> Submitting job with JobID: 8d731f801d00268f951a98d093f21e0c. Waiting for job
> completion.
> Connected to JobManager at
> Actor[akka.tcp://flink@192.168.1.116:6123/user/jobmanager#422012792] with
> leader session id 00000000-0000-0000-0000-000000000000.
> 10/13/2017 11:10:57	Job execution switched to status RUNNING.
> 10/13/2017 11:10:57	Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
> 10/13/2017 11:10:57	Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
> 10/13/2017 11:11:05	Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
> 
>   There is no exception in log. I suspect deployment of kafka having issue. 
> 
> Can you please help me to check it.
> 
> 
> 
> 
> public static void main(String[] args) { 
>    Pipeline p = initializePipeline(args); 
>    Map<String, List&amp;lt;String>> intelliOmIms = new TreeMap<>(); 
> 
>    PTransform<PBegin, PCollection&amp;lt;KV&amp;lt;Integer, byte[]>>>
> reader; 
>    reader = KafkaIO.<Integer, byte[]>read() 
>           .withBootstrapServers("192.168.1.116:9092")    --->Kafka 
> zookeeper and server running 
>            .withTopic("kafkatest") 
>            .withKeyDeserializer(IntegerDeserializer.class) 
>            .withValueDeserializer(IntelliOmImsKpiDataUtil.class) 
>            .withoutMetadata(); 
> 
>    PCollection<KV&amp;lt;Integer, byte[]>> output = p.apply(reader); 
>    output.apply(ParDo.of(new PrintMsg())); 
> 
>    p.run().waitUntilFinish(); 
> } 
> 
> public static class PrintMsg extends DoFn<KV&lt;Integer, byte[]>, Void> {
> 
>        @ProcessElement
>        public void processElement(ProcessContext c) {
> 
>            try {
>                System.out.println("Received Message .... from testkafka
> Topic : " + new String(c.element().getValue(), "UTF-8"));
>            } catch (UnsupportedEncodingException e) {
>                e.printStackTrace();
>            }
>        }
>    }
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Beam Application run on cluster setup (Kafka+Flink)

Posted by Shankara <sh...@gmail.com>.
Hi,

    I mean same code works fine in flink local setup. I can able to see
"Received Message .... from testkafka Topic : " on console when kafka
receive some message (Kafka Producer is in other machine and sending some
message frequently to testkafka topic).
     
     *Submitted the Beam application to flink local by below command :*
         mvn compile exec:java
-Dexec.mainClass=org.apache.beam.influxdb.KafkaRead  -Pflink-runner

     *Output is :*
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#735957608]
with leader session id d97c060d-bdf9-4215-8d7c-138f13cbff1e.
10/13/2017 11:09:09	Job execution switched to status RUNNING.
10/13/2017 11:09:09	Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
10/13/2017 11:09:09	Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
10/13/2017 11:09:09	Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
*Received in Deserilize..
Received Message .... from testkafka Topic : HELLOASA*



    If I run same code in Flink Cluster I cannot see any message in
log/stdout, But job is continuously running and Kafka Producer is in other
machine and sending some message frequently to testkafka topic.

  * I started flink cluster by below command : *
       bin/start-cluster.sh

   *Submitted the Beam application to flink cluster by below command :*
      bin/flink run -c org.apache.beam.influxdb.KafkaRead
/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
--runner=FlinkRunner --flinkMaster=192.168.1.116
--filesToStage=/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar    

   In dashboad :

  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/DashBoard.png> 



    I cannot see any message in dashboard :

     
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/Stdout.png> 


   As per log Job execution is running :
Cluster configuration: Standalone cluster with JobManager at
/192.168.1.116:6123
Using address 192.168.1.116:6123 to connect to JobManager.
JobManager web interface address http://192.168.1.116:8081
Starting execution of program
Submitting job with JobID: 8d731f801d00268f951a98d093f21e0c. Waiting for job
completion.
Connected to JobManager at
Actor[akka.tcp://flink@192.168.1.116:6123/user/jobmanager#422012792] with
leader session id 00000000-0000-0000-0000-000000000000.
10/13/2017 11:10:57	Job execution switched to status RUNNING.
10/13/2017 11:10:57	Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
10/13/2017 11:10:57	Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
10/13/2017 11:11:05	Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 

   There is no exception in log. I suspect deployment of kafka having issue. 

Can you please help me to check it.




 public static void main(String[] args) { 
    Pipeline p = initializePipeline(args); 
    Map<String, List&amp;lt;String>> intelliOmIms = new TreeMap<>(); 
 
    PTransform<PBegin, PCollection&amp;lt;KV&amp;lt;Integer, byte[]>>>
reader; 
    reader = KafkaIO.<Integer, byte[]>read() 
           .withBootstrapServers("192.168.1.116:9092")    --->Kafka 
 zookeeper and server running 
            .withTopic("kafkatest") 
            .withKeyDeserializer(IntegerDeserializer.class) 
            .withValueDeserializer(IntelliOmImsKpiDataUtil.class) 
            .withoutMetadata(); 
 
    PCollection<KV&amp;lt;Integer, byte[]>> output = p.apply(reader); 
    output.apply(ParDo.of(new PrintMsg())); 
 
    p.run().waitUntilFinish(); 
 } 

public static class PrintMsg extends DoFn<KV&lt;Integer, byte[]>, Void> {

        @ProcessElement
        public void processElement(ProcessContext c) {

            try {
                System.out.println("Received Message .... from testkafka
Topic : " + new String(c.element().getValue(), "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Beam Application run on cluster setup (Kafka+Flink)

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

What do you mean by:

> With standalone beam application kafka can receive the message, But in
cluster setup it is not working.

In your example you are reading the data from Kafka and printing them to console. There doesn’t seems to be anything that writes back to Kafka, so what do you mean by “Kafka can not receive the message”?

Did you check the output file of your application in the log directory? Did you check Flink logs if there are any errors?

Piotrek

> On 12 Oct 2017, at 15:49, Shankara <sh...@gmail.com> wrote:
> 
> Below is my setup 
>        1. Kafka zookeeper and server in one machine (192.168.1.116) and
> producer (192.168.1.100) and consumer (192.168.1.117) in another machine.  
> --> This work fine no issue 
>        2. Running standalone beam application with kafka consumer --> This
> work fine
>        3. Running beam application in flink cluster with kafka consumer -->
> This is not working
>  Not receiving message from kafka producer.
> 
> Same program works fine with standalone with flink runner.
> Below is my code snippet.
> 
> public static void main(String[] args) {
>    Pipeline p = initializePipeline(args);
>    Map<String, List&lt;String>> intelliOmIms = new TreeMap<>();
> 
>    PTransform<PBegin, PCollection&lt;KV&lt;Integer, byte[]>>> reader;
>    reader = KafkaIO.<Integer, byte[]>read()
>            .withBootstrapServers("192.168.1.116:9092")    --->Kafka
> zookeeper and server running
>            .withTopic("kafkatest")
>            .withKeyDeserializer(IntegerDeserializer.class)
>            .withValueDeserializer(IntelliOmImsKpiDataUtil.class)
>            .withoutMetadata();
> 
>    PCollection<KV&lt;Integer, byte[]>> output = p.apply(reader);
>    output.apply(ParDo.of(new PrintMsg()));
> 
>    p.run().waitUntilFinish();
> }
> 
>  In IntelliOmImsKpiDataUtil deserializer I am just printing message saying
> that kafka is received the message.
> 
> public static class PrintMsg extends DoFn<KV&lt;Integer, byte[]>, Void> {
> 
>    @ProcessElement
>    public void processElement(ProcessContext c) {
>        System.out.println("Received Message .... from kafkatest Topic ");
>    }
> }
> 
>  Started Zookeeper in 192.168.1.116 like below :
>    bin/zookeeper-server-start.sh config/zookeeper.properties
> 
>  Started Server in 192.168.1.116 like below :
>    bin/kafka-server-start.sh config/server.properties
> 
>  Started Producer in 192.168.1.100 like below :
>    bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 --topic
> kafkatest
> 
>  Started Consumer in 192.168.1.117 like below :
>    bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 --topic
> kafkatest --from-beginning
> 
>   With standalone beam application kafka can receive the message, But in
> cluster setup it is not working.
> 
> Can you please help me to check it. 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/