You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fábio Dias <fa...@gmail.com> on 2017/02/20 16:41:25 UTC

Apache Flink and Elasticsearch send Json Object instead of string

Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a
json object ({"id":1, "name":"X"} ect...), I already have a string with
this information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green
door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList<InetSocketAddress> transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction<String> indexLog = new
ElasticsearchSinkFunction<String>() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();

                    esJson.put("data", element);



                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink<String> esSink = new
ElasticsearchSink<String>(config, transports, indexLog);
            input.addSink(esSink);
        }
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key
value?

Thanks.

Re: Apache Flink and Elasticsearch send Json Object instead of string

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

The Flink Elasticsearch Sink uses the Elasticsearch Java client to send the indexing requests, so whatever the client supports, it will be achievable through the `ElasticsearchSinkFunction` also.

From a quick check at the Elasticsearch Javadocs, I think you can also just set the document json as a String in the created `IndexRequest`. So,

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }

Here, if `element` is already a Json string representing the document, you can just do 

return Requests
    .indexRequest()
    .index(“logs”)
    .type(“object”)
    .source(“the Json String”);

The `.source(…)` method has quite a few variants on how to set the source, and providing a Map is only one of them.
Please refer to the Elasticsearch Javadocs for the full list (https://www.javadoc.io/doc/org.elasticsearch/elasticsearch/5.2.1).

Hope this helps!

Cheers,
Gordon
On February 21, 2017 at 5:43:36 PM, Fábio Dias (fabiodiogoo@gmail.com) wrote:

Hi, 
thanks for the reply.

There isn't other way to do that?
Using REST you can send json like this : 

curl -XPOST 'localhost:9200/customer/external?pretty&pretty' -H 'Content-Type: application/json' -d'
{
 "name": "Jane Doe"
}
'

In my case I have json like this: 

{
      "filters" : {
                        "id" : 1,
                        "name": "abc"
                    }
}

how can I treat this cases? There isn't a way to send all the json element and index it like the in the REST request?

Thanks.

Tzu-Li (Gordon) Tai <tz...@apache.org> escreveu no dia terça, 21/02/2017 às 07:54:
Hi,

I’ll use your code to explain.

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
What you should do here is parse the field values from `element`, and simply treat them as key-value pairs of the `esJson` map.

So, the `esJson` should be prepared by doing:

esJson.put(“id”, 6);

esJson.put(“name”, “A green door”);

esJson.put(“price”, 12.5);

etc.



Cheers,

Gordon



On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodiogoo@gmail.com) wrote:

Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json object ({"id":1, "name":"X"} ect...), I already have a string with this information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList<InetSocketAddress> transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink<String> esSink = new ElasticsearchSink<String>(config, transports, indexLog);
            input.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key value?

Thanks.

Re: Apache Flink and Elasticsearch send Json Object instead of string

Posted by Fábio Dias <fa...@gmail.com>.
Hi,
thanks for the reply.

There isn't other way to do that?
Using REST you can send json like this :

curl -XPOST 'localhost:9200/customer/external?pretty&pretty' -H
'Content-Type: application/json' -d'
{
 "name": "Jane Doe"
}
'

In my case I have json like this:

{
      "filters" : {
                        "id" : 1,
                        "name": "abc"
                    }
}

how can I treat this cases? There isn't a way to send all the json element
and index it like the in the REST request?

Thanks.

Tzu-Li (Gordon) Tai <tz...@apache.org> escreveu no dia terça, 21/02/2017
às 07:54:

> Hi,
>
> I’ll use your code to explain.
>
> public IndexRequest createIndexRequest(String element){
>
>                     HashMap<String, Object> esJson = new HashMap<>();
>
>                     esJson.put("data", element);
>
> What you should do here is parse the field values from `element`, and
> simply treat them as key-value pairs of the `esJson` map.
>
> So, the `esJson` should be prepared by doing:
>
> esJson.put(“id”, 6);
>
> esJson.put(“name”, “A green door”);
>
> esJson.put(“price”, 12.5);
>
> etc.
>
>
> Cheers,
>
> Gordon
>
>
> On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodiogoo@gmail.com)
> wrote:
>
> Hi,
>
> I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a
> json object ({"id":1, "name":"X"} ect...), I already have a string with
> this information, but I don't want to save it as string.
>
> I recieve this:
>
> {
>   "_index": "logs",
>   "_type": "object",
>   "_id": "AVpcARfkfYWqSubr0ZvK",
>   "_score": 1,
>   "_source": {
>     "data": "{\"id\":6,\"name\":\"A green
> door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
>   }
> }
>
> And I want to recieve this:
>
> {
> "_index": "logs",
> "_type": "external",
> "_id": "AVpcARfkfYWqSubr0ZvK",
> "_score": 1,
> "_source": {
> "data": {
> "id":6,
> "name":"A green door",
> "price":12.5,
> "tags":
> ["home","green"]
> }
> }
> }
>
> my java code:
>
> try {
>             ArrayList<InetSocketAddress> transports = new ArrayList<>();
>             transports.add(new InetSocketAddress("127.0.0.1", 9300));
>
>             ElasticsearchSinkFunction<String> indexLog = new
> ElasticsearchSinkFunction<String>() {
>
> private static final long serialVersionUID = 8802869701292023100L;
>
> public IndexRequest createIndexRequest(String element){
>
>                     HashMap<String, Object> esJson = new HashMap<>();
>
>                     esJson.put("data", element);
>
>
>
>                     return Requests
>                             .indexRequest()
>                             .index("logs")
>                             .type("object")
>                             .source(esJson);
>                 }
> @Override
>                 public void process(String element, RuntimeContext ctx,
> RequestIndexer indexer) {
>                     indexer.add(createIndexRequest(element));
>                 }
>             };
>
>             ElasticsearchSink<String> esSink = new
> ElasticsearchSink<String>(config, transports, indexLog);
>             input.addSink(esSink);
>         }
>         catch (Exception e) {
>             System.out.println(e);
>         }
>
>
> Do I need to treat every entry as a map? Can I just send a object with key
> value?
>
> Thanks.
>
>

Re: Apache Flink and Elasticsearch send Json Object instead of string

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

I’ll use your code to explain.

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
What you should do here is parse the field values from `element`, and simply treat them as key-value pairs of the `esJson` map.

So, the `esJson` should be prepared by doing:

esJson.put(“id”, 6);

esJson.put(“name”, “A green door”);

esJson.put(“price”, 12.5);

etc.



Cheers,

Gordon



On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodiogoo@gmail.com) wrote:

Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json object ({"id":1, "name":"X"} ect...), I already have a string with this information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList<InetSocketAddress> transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink<String> esSink = new ElasticsearchSink<String>(config, transports, indexLog);
            input.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key value?

Thanks.