You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ulf Thomas <ul...@relayr.de> on 2017/03/02 12:58:08 UTC

Http Requests from Flink

Hello,

I've been trying to perform HTTP requests from a Flink Program but I wasn't
successful :-(.

Does anybody here has done this before and can point me to an working
library?

I've attached a small demo project in case someone wants to try to solve
this.

Best,

-- 
--
Ulf Thomas
Software Developer
relayr

Re: Http Requests from Flink

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Hi Ulf,

I've done HTTP requests in Flink using OkHttp library
<http://square.github.io/okhttp/>. I found it practical and easy to use.
Here is how I used it to make a POST request for each incoming element in
the stream and ouput the response:

DataStream<String> stream = ....

stream.map(new RichMapFunction<String, String>() {

    OkHttpClient client;

    @Override
    public void open(Configuration config) throws IOException {
        client = new OkHttpClient();
    }

    @Override
    public String map(String in) throws Exception {

        okhttp3.Request request = new okhttp3.Request.Builder()
                    .url("http://localhost:8080")
                    .post(RequestBody.create(MediaType.parse("text/plain;
charset=utf-8"), in))
                    .build();
            Response response = client.newCall(request).execute();
            if (response.code() != 200) {
                throw new Exception("Failed request");
            }
            String result = response.body().string();
            return result;
    }
})

I hope this helps.

Best,
Yassine


2017-03-02 14:17 GMT+01:00 Alex De Castro <al...@lab49.com>:

> Hi Ulf,
>
> I’ve had similar problem, before but from a sink perspective: I had to
> create a HTTP sink for a Kafka REST API. I’ve used scalaj-http
> https://github.com/scalaj/scalaj-http which is a wrapper for the
> corresponding Java lib.
>
>
>
> For example,
>
> https://github.com/scalaj/scalaj-http
>
>
>
> For example
>
>
>
> *class *HttpSink *extends *SinkFunction[Message]{
>   *private val **secretkey *= *new *GetToken().*token *
>
>   *def *sendMessage(message: Message):String = Http("
> http://XXX.XXX.XX.XXX:5000/api/message") // <-- GLOBAL var
>     .header("Content-Type","application/json")
>     .header("Authorization", s"Bearer *$**secretkey*")
>     .postData(message.*data*).asString.body
>
>   @throws[Exception]
>   *override def *invoke(message: Message): Unit = {
>     log.info(sendMessage(message))
>   }
> }
>
>
>
> I image for a http source, you could send a request to the REST API
> periodically and convert the micro-batches into a stream. I’d love to know
> about other alternatives.
>
>
>
> Cheers,
>
> Alex
>
> *From: *Ulf Thomas <ul...@relayr.de>
> *Reply-To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Date: *Thursday, March 2, 2017 at 12:58 PM
> *To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Http Requests from Flink
>
>
>
> Hello,
>
>
>
> I've been trying to perform HTTP requests from a Flink Program but I
> wasn't successful :-(.
>
>
>
> Does anybody here has done this before and can point me to an working
> library?
>
>
>
> I've attached a small demo project in case someone wants to try to solve
> this.
>
>
>
> Best,
>
>
>
> --
>
> --
> Ulf Thomas
> Software Developer
> relayr
> This email and any attachments may contain information which is
> confidential and/or privileged. The information is intended exclusively for
> the addressee and the views expressed may not be official policy, but the
> personal views of the originator. If you are not the intended recipient, be
> aware that any disclosure, copying, distribution or use of the contents is
> prohibited. If you have received this email and any file transmitted with
> it in error, please notify the sender by telephone or return email
> immediately and delete the material from your computer. Internet
> communications are not secure and Lab49 is not responsible for their abuse
> by third parties, nor for any alteration or corruption in transmission, nor
> for any damage or loss caused by any virus or other defect. Lab49 accepts
> no liability or responsibility arising out of or in any way connected to
> this email.
>

Re: Http Requests from Flink

Posted by Alex De Castro <al...@lab49.com>.
Hi Ulf,
I’ve had similar problem, before but from a sink perspective: I had to create a HTTP sink for a Kafka REST API. I’ve used scalaj-http https://github.com/scalaj/scalaj-http which is a wrapper for the corresponding Java lib.

For example,
https://github.com/scalaj/scalaj-http

For example

class HttpSink extends SinkFunction[Message]{
  private val secretkey = new GetToken().token

  def sendMessage(message: Message):String = Http("http://XXX.XXX.XX.XXX:5000/api/message") // <-- GLOBAL var
    .header("Content-Type","application/json")
    .header("Authorization", s"Bearer $secretkey")
    .postData(message.data).asString.body

  @throws[Exception]
  override def invoke(message: Message): Unit = {
    log.info(sendMessage(message))
  }
}

I image for a http source, you could send a request to the REST API periodically and convert the micro-batches into a stream. I’d love to know about other alternatives.

Cheers,
Alex
From: Ulf Thomas <ul...@relayr.de>
Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
Date: Thursday, March 2, 2017 at 12:58 PM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Http Requests from Flink

Hello,

I've been trying to perform HTTP requests from a Flink Program but I wasn't successful :-(.

Does anybody here has done this before and can point me to an working library?

I've attached a small demo project in case someone wants to try to solve this.

Best,

--
--
Ulf Thomas
Software Developer
relayr
This email and any attachments may contain information which is confidential and/or privileged. The information is intended exclusively for the addressee and the views expressed may not be official policy, but the personal views of the originator. If you are not the intended recipient, be aware that any disclosure, copying, distribution or use of the contents is prohibited. If you have received this email and any file transmitted with it in error, please notify the sender by telephone or return email immediately and delete the material from your computer. Internet communications are not secure and Lab49 is not responsible for their abuse by third parties, nor for any alteration or corruption in transmission, nor for any damage or loss caused by any virus or other defect. Lab49 accepts no liability or responsibility arising out of or in any way connected to this email.