You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by mingleizhang <18...@163.com> on 2017/08/15 13:17:00 UTC
How to verify the data to Elasticsearch whether correct or not ?
Hi, flink experts!
I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))
Thanks
mingleizhang
Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct
or not ?
Posted by mingleizhang <18...@163.com>.
I solved the issue by adding a dependency that convert the protobuf objects into a JSON. By adding a line of code like below: element is a PB object.
Thanks.
zhangminglei
At 2017-08-16 22:52:30, "mingleizhang" <18...@163.com> wrote:
I looked into the sinked data which in ElasticSearch. Good news I can found it is really right there. But but, I sinked the data is an object. But the Elasticsearch represent it as a string. I put the related code below.
element type is an ActivityInfo. then, I wrote a java api to read the data. the value is a string instead. I want it represented as an object of ActivityInfo. But it didnt do like what i want.
Can anybody give me some advice for it ? Thank you very much!
Thanks
zhangminglei / mingleizhang
At 2017-08-16 20:52:34, "mingleizhang" <18...@163.com> wrote:
Hi, Gordon.
I am not sure about this, as far as I know. ElasticSearch often store JSON data inside it as it is convenient to create it's index. As refers to my code below, I stored the protobuf objects (ActivityInfo which build from activityinfo.proto file) in ElasticSearch. And it is a binary data stored in it. It is very strange I feel. Flink document just give an example for it's data which type belongs to a string as JSON.
Peace,
Zhangminglei
At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai" <tz...@apache.org> wrote:
Hi,
I couldn’t spot anything off in the code snippet you provided. So you should be ok with this :)
Cheers,
Gordon
On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838093@163.com) wrote:
BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted to itself.
At 2017-08-15 21:17:00, "mingleizhang" <18...@163.com> wrote:
Hi, flink experts!
I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))
Thanks
mingleizhang
【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
Re:Re:Re:How to verify the data to Elasticsearch whether correct or
not ?
Posted by mingleizhang <18...@163.com>.
I looked into the sinked data which in ElasticSearch. Good news I can found it is really right there. But but, I sinked the data is an object. But the Elasticsearch represent it as a string. I put the related code below.
element type is an ActivityInfo. then, I wrote a java api to read the data. the value is a string instead. I want it represented as an object of ActivityInfo. But it didnt do like what i want.
Can anybody give me some advice for it ? Thank you very much!
Thanks
zhangminglei / mingleizhang
At 2017-08-16 20:52:34, "mingleizhang" <18...@163.com> wrote:
Hi, Gordon.
I am not sure about this, as far as I know. ElasticSearch often store JSON data inside it as it is convenient to create it's index. As refers to my code below, I stored the protobuf objects (ActivityInfo which build from activityinfo.proto file) in ElasticSearch. And it is a binary data stored in it. It is very strange I feel. Flink document just give an example for it's data which type belongs to a string as JSON.
Peace,
Zhangminglei
At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai" <tz...@apache.org> wrote:
Hi,
I couldn’t spot anything off in the code snippet you provided. So you should be ok with this :)
Cheers,
Gordon
On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838093@163.com) wrote:
BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted to itself.
At 2017-08-15 21:17:00, "mingleizhang" <18...@163.com> wrote:
Hi, flink experts!
I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))
Thanks
mingleizhang
【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
Re:Re:How to verify the data to Elasticsearch whether correct or
not ?
Posted by mingleizhang <18...@163.com>.
Hi, Gordon.
I am not sure about this, as far as I know. ElasticSearch often store JSON data inside it as it is convenient to create it's index. As refers to my code below, I stored the protobuf objects (ActivityInfo which build from activityinfo.proto file) in ElasticSearch. And it is a binary data stored in it. It is very strange I feel. Flink document just give an example for it's data which type belongs to a string as JSON.
Peace,
Zhangminglei
At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai" <tz...@apache.org> wrote:
Hi,
I couldn’t spot anything off in the code snippet you provided. So you should be ok with this :)
Cheers,
Gordon
On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838093@163.com) wrote:
BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted to itself.
At 2017-08-15 21:17:00, "mingleizhang" <18...@163.com> wrote:
Hi, flink experts!
I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))
Thanks
mingleizhang
【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
Re:How to verify the data to Elasticsearch whether correct or
not ?
Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,
I couldn’t spot anything off in the code snippet you provided. So you should be ok with this :)
Cheers,
Gordon
On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838093@163.com) wrote:
BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted to itself.
At 2017-08-15 21:17:00, "mingleizhang" <18...@163.com> wrote:
Hi, flink experts!
I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))
Thanks
mingleizhang
【网易自营】好吃到爆!鲜香弹滑加热即食,经典13香/麻辣小龙虾仅75元3斤>>
Re:How to verify the data to Elasticsearch whether correct or not ?
Posted by mingleizhang <18...@163.com>.
BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted to itself.
At 2017-08-15 21:17:00, "mingleizhang" <18...@163.com> wrote:
Hi, flink experts!
I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))
Thanks
mingleizhang