You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Puneet Kinra <pu...@customercentria.com> on 2018/03/14 12:49:38 UTC

activemq connector not working..

Hi

I used apache bahir connector  below is the code.the job is getting finished
and not generated the output as well ,ideal it should keep on running below
the code.


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.activemq.AMQSource;
import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
import org.apache.flink.streaming.connectors.activemq.DestinationType;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author puneet
 *
 */
public class TestAMQ {


public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
AMQSourceConfig<String> sourceConfig = new
AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(new
ActiveMQConnectionFactory("tcp://localhost:61616"))
.setDestinationName("test")
.setDeserializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.QUEUE)
.build();
DataStream < String > messageStream = env.addSource(new
AMQSource<String>(sourceConfig));
messageStream.print();
env.execute();
}

}


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*

Re: activemq connector not working..

Posted by Puneet Kinra <pu...@customercentria.com>.
I tried getting this in logs..


2018-03-15 20:59:38,154 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask
          - No state backend has been configured, using default state
backend (Memory / JobManager)

2018-03-15 20:59:38,296 INFO
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase
- No state to restore for the AMQSource.

2018-03-15 20:59:39,488 WARN
org.apache.flink.streaming.connectors.activemq.AMQSource      - Active MQ
source received non bytes message: null


On Thu, Mar 15, 2018 at 9:00 PM, Puneet Kinra <
puneet.kinra@customercentria.com> wrote:

> I tried in cluster as well .
>
> On Wed, Mar 14, 2018 at 10:01 PM, Timo Walther <tw...@apache.org> wrote:
>
>> Hi Puneet,
>>
>> are you running this job on the cluster or locally in your IDE?
>>
>> Regards,
>> Timo
>>
>>
>> Am 14.03.18 um 13:49 schrieb Puneet Kinra:
>>
>> Hi
>>
>> I used apache bahir connector  below is the code.the job is getting
>> finished
>> and not generated the output as well ,ideal it should keep on running
>> below the code.
>>
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.connectors.activemq.AMQSource;
>> import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
>> import org.apache.flink.streaming.connectors.activemq.DestinationType;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>>
>> /**
>>  * @author puneet
>>  *
>>  */
>> public class TestAMQ {
>>
>>
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>> AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfi
>> gBuilder<String>()
>> .setConnectionFactory(new ActiveMQConnectionFactory("tcp
>> ://localhost:61616"))
>> .setDestinationName("test")
>> .setDeserializationSchema(new SimpleStringSchema())
>> .setDestinationType(DestinationType.QUEUE)
>> .build();
>> DataStream < String > messageStream = env.addSource(new
>> AMQSource<String>(sourceConfig));
>> messageStream.print();
>> env.execute();
>> }
>>
>> }
>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
>> <pu...@customercentria.com>*
>>
>> *e-mail :puneet.kinra@customercentria.com
>> <pu...@customercentria.com>*
>>
>>
>>
>>
>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
> *e-mail :puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
>
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*

Re: activemq connector not working..

Posted by Puneet Kinra <pu...@customercentria.com>.
I tried in cluster as well .

On Wed, Mar 14, 2018 at 10:01 PM, Timo Walther <tw...@apache.org> wrote:

> Hi Puneet,
>
> are you running this job on the cluster or locally in your IDE?
>
> Regards,
> Timo
>
>
> Am 14.03.18 um 13:49 schrieb Puneet Kinra:
>
> Hi
>
> I used apache bahir connector  below is the code.the job is getting
> finished
> and not generated the output as well ,ideal it should keep on running
> below the code.
>
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.activemq.AMQSource;
> import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
> import org.apache.flink.streaming.connectors.activemq.DestinationType;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>
> /**
>  * @author puneet
>  *
>  */
> public class TestAMQ {
>
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
> AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.
> AMQSourceConfigBuilder<String>()
> .setConnectionFactory(new ActiveMQConnectionFactory("
> tcp://localhost:61616"))
> .setDestinationName("test")
> .setDeserializationSchema(new SimpleStringSchema())
> .setDestinationType(DestinationType.QUEUE)
> .build();
> DataStream < String > messageStream = env.addSource(new AMQSource<String>(
> sourceConfig));
> messageStream.print();
> env.execute();
> }
>
> }
>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
> *e-mail :puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
>
>
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*

Re: activemq connector not working..

Posted by Timo Walther <tw...@apache.org>.
Hi Puneet,

are you running this job on the cluster or locally in your IDE?

Regards,
Timo


Am 14.03.18 um 13:49 schrieb Puneet Kinra:
> Hi
>
> I used apache bahir connector  below is the code.the job is getting 
> finished
> and not generated the output as well ,ideal it should keep on running 
> below the code.
>
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.activemq.AMQSource;
> import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
> import org.apache.flink.streaming.connectors.activemq.DestinationType;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>
> /**
>  * @author puneet
>  *
>  */
> public class TestAMQ {
>
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> AMQSourceConfig<String> sourceConfig = new 
> AMQSourceConfig.AMQSourceConfigBuilder<String>()
> .setConnectionFactory(new 
> ActiveMQConnectionFactory("tcp://localhost:61616"))
> .setDestinationName("test")
> .setDeserializationSchema(new SimpleStringSchema())
> .setDestinationType(DestinationType.QUEUE)
> .build();
> DataStream < String > messageStream = env.addSource(new 
> AMQSource<String>(sourceConfig));
> messageStream.print();
> env.execute();
> }
>
> }
>
>
> -- 
> *Cheers *
> *
> *
> *Puneet Kinra*
> *
> *
>
> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com 
> <ma...@customercentria.com>*
>
> *e-mail :puneet.kinra@customercentria.com 
> <ma...@customercentria.com>*
>
>