You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "shiv kumar (JIRA)" <ji...@apache.org> on 2019/04/03 07:24:00 UTC

[jira] [Created] (FLINK-12093) Apache Flink:Active MQ consumer job is getting finished after first message consume.

shiv kumar created FLINK-12093:
----------------------------------

             Summary: Apache Flink:Active MQ consumer job is getting finished after first message consume.
                 Key: FLINK-12093
                 URL: https://issues.apache.org/jira/browse/FLINK-12093
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.7.2
         Environment: Working in my local IDE(Eclipse).
            Reporter: shiv kumar


Hi Team,

 

Below is my the code the the execution environment to run the Apache Flink job that's consume message from ActiveMQ topic::

 

StreamExecutionEnvironment env = createExecutionEnvironment();

connectionFactory = new ActiveMQConnectionFactory("******", "******.",
 "failover:(tcp://amq-master-01:61668)?timeout=3000");

LOG.info("exceptionListener----{}", new AMQExceptionListLocal(LOG, true));

RunningChecker runningChecker = new RunningChecker();
 runningChecker.setIsRunning(true);

AMQSourceConfig<String> config = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
 .setConnectionFactory(connectionFactory).setDestinationName("test_flink")
 .setDeserializationSchema(deserializationSchema).setRunningChecker(runningChecker)
 .setDestinationType(DestinationType.TOPIC).build();

amqSource = new AMQSourceLocal<>(config);

LOG.info("Check whether ctx is null ::;;;;{}", amqSource);

DataStream<String> dataMessage = env.addSource(amqSource);

dataMessage.writeAsText("C:/Users/shivkumar/Desktop/flinksjar/output.txt", WriteMode.OVERWRITE);
 System.out.println("Step 1");

env.execute("Check ACTIVE_MQ");

 

When we are starting the job, Topic is getting created and message is getting dequeued from that topic.

But After that is getting finished. What Can be done to keep the job running?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)