You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Devendra Vishwakarma <vv...@gmail.com> on 2017/10/06 06:14:44 UTC

Issue of Twitter streaming with CustomEndpoint

Hi,

I am following Apache Flink Twitter streaming example given in
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
to get the tweets using hashtag, for which I have created a custom
endpoint. But my custom endpoint is not working (When I use default
endpoint of the TwitterSource then it is working fine).

*My CustomEndpoint is -*

public class CustomEndpoint implements EndpointInitializer, Serializable {

  private static final long serialVersionUID = 1L;

  @Override
  public StreamingEndpoint createEndpoint() {
    AppleSampleEndpoint endpoint = new AppleSampleEndpoint();
    return endpoint;
  }}


*AppleSampleEndpoint.java-*

public class AppleSampleEndpoint extends RawEndpoint  {

  public static final String PATH =
"https://api.twitter.com/1.1/search/tweets.json?q=%23bigdata";

    public AppleSampleEndpoint() {
      super(PATH, HttpConstants.HTTP_GET);
    }}



I want to fetch the tweets using hashtag in Apache Flink, for which I have
created a custom endpoint. But my custom endpoint is not working (When I
use apache flink default endpoint it is working).

*CustomEndpoint-*

public class CustomEndpoint implements EndpointInitializer, Serializable {

  private static final long serialVersionUID = 1L;

  @Override
  public StreamingEndpoint createEndpoint() {
    // this default endpoint initializer returns the sample endpoint:
    // Returning a sample from the firehose (all tweets)
    AppleSampleEndpoint endpoint = new AppleSampleEndpoint();
    return endpoint;
  }}

*AppleSampleEndpoint.java-*

public class AppleSampleEndpoint extends RawEndpoint  {

  public static final String PATH =
"https://api.twitter.com/1.1/search/tweets.json?q=%23bigdata";

    public AppleSampleEndpoint() {
      super(PATH, HttpConstants.HTTP_GET);
    }}

And this is how I am using CustomEndpoint in my Main class-

DataStream<String> streamSource;
    if (params.has(TwitterSource.CONSUMER_KEY) &&
            params.has(TwitterSource.CONSUMER_SECRET) &&
            params.has(TwitterSource.TOKEN) &&
            params.has(TwitterSource.TOKEN_SECRET)
            ) {
        TwitterSource source = new TwitterSource(params.getProperties());
        source.setCustomEndpointInitializer(new CustomEndpoint());
        streamSource = env.addSource(source);

But I am not able to fetch tweets with hashtag apple here.

Thanks

Devendra Vishwakarma

Re: Issue of Twitter streaming with CustomEndpoint

Posted by Till Rohrmann <tr...@apache.org>.
Hi Devendra,

thanks for reaching out to the Flink dev mailing list. I think your
question will be better answered on the user mailing list.

Cheers,
Till

On Fri, Oct 6, 2017 at 8:14 AM, Devendra Vishwakarma <vv...@gmail.com>
wrote:

> Hi,
>
> I am following Apache Flink Twitter streaming example given in
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-
> streaming/src/main/java/org/apache/flink/streaming/examples/twitter/
> TwitterExample.java
> to get the tweets using hashtag, for which I have created a custom
> endpoint. But my custom endpoint is not working (When I use default
> endpoint of the TwitterSource then it is working fine).
>
> *My CustomEndpoint is -*
>
> public class CustomEndpoint implements EndpointInitializer, Serializable {
>
>   private static final long serialVersionUID = 1L;
>
>   @Override
>   public StreamingEndpoint createEndpoint() {
>     AppleSampleEndpoint endpoint = new AppleSampleEndpoint();
>     return endpoint;
>   }}
>
>
> *AppleSampleEndpoint.java-*
>
> public class AppleSampleEndpoint extends RawEndpoint  {
>
>   public static final String PATH =
> "https://api.twitter.com/1.1/search/tweets.json?q=%23bigdata";
>
>     public AppleSampleEndpoint() {
>       super(PATH, HttpConstants.HTTP_GET);
>     }}
>
>
>
> I want to fetch the tweets using hashtag in Apache Flink, for which I have
> created a custom endpoint. But my custom endpoint is not working (When I
> use apache flink default endpoint it is working).
>
> *CustomEndpoint-*
>
> public class CustomEndpoint implements EndpointInitializer, Serializable {
>
>   private static final long serialVersionUID = 1L;
>
>   @Override
>   public StreamingEndpoint createEndpoint() {
>     // this default endpoint initializer returns the sample endpoint:
>     // Returning a sample from the firehose (all tweets)
>     AppleSampleEndpoint endpoint = new AppleSampleEndpoint();
>     return endpoint;
>   }}
>
> *AppleSampleEndpoint.java-*
>
> public class AppleSampleEndpoint extends RawEndpoint  {
>
>   public static final String PATH =
> "https://api.twitter.com/1.1/search/tweets.json?q=%23bigdata";
>
>     public AppleSampleEndpoint() {
>       super(PATH, HttpConstants.HTTP_GET);
>     }}
>
> And this is how I am using CustomEndpoint in my Main class-
>
> DataStream<String> streamSource;
>     if (params.has(TwitterSource.CONSUMER_KEY) &&
>             params.has(TwitterSource.CONSUMER_SECRET) &&
>             params.has(TwitterSource.TOKEN) &&
>             params.has(TwitterSource.TOKEN_SECRET)
>             ) {
>         TwitterSource source = new TwitterSource(params.getProperties());
>         source.setCustomEndpointInitializer(new CustomEndpoint());
>         streamSource = env.addSource(source);
>
> But I am not able to fetch tweets with hashtag apple here.
>
> Thanks
>
> Devendra Vishwakarma
>