You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by boci <bo...@gmail.com> on 2014/06/25 00:42:33 UTC

ElasticSearch enrich

Hi guys,

I have a small question. I want to create a "Worker" class which using
ElasticClient to make query to elasticsearch. (I want to enrich my data
with geo search result).

How can I do that? I try to create a worker instance with ES host/port
parameter but spark throw an exceptino (my class not serializable).

Any idea?

Thanks
b0c1

RE: ElasticSearch enrich

Posted by Adrian Mocanu <am...@verticalscope.com>.
b0c1<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=1215>, could you post your code? I am interested in your solution.

Thanks
Adrian

From: boci [mailto:boci.boci@gmail.com]
Sent: June-26-14 6:17 PM
To: user@spark.apache.org
Subject: Re: ElasticSearch enrich

Wow, thanks your fast answer, it's help a lot...

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com<ma...@gmail.com>

On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <ho...@pigscanfly.ca>> wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call  saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)

On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com>> wrote:
Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com<ma...@gmail.com>

On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <ni...@gmail.com>> wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).

On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com>> wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com<ma...@gmail.com>

On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <ho...@pigscanfly.ca>> wrote:

On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com>> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create.
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster.

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com<ma...@gmail.com>

On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <ho...@pigscanfly.ca>> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function.

Hope this helps!

Cheers,

Holden :)

On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <ma...@gmail.com>> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD.


Mayur Rustagi
Ph: +1 (760) 203 3257<tel:%2B1%20%28760%29%20203%203257>
http://www.sigmoidanalytics.com
@mayur_rustagi<https://twitter.com/mayur_rustagi>


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : 425-233-8271<tel:425-233-8271>




--
Cell : 425-233-8271<tel:425-233-8271>






--
Cell : 425-233-8271


Re: ElasticSearch enrich

Posted by Holden Karau <ho...@pigscanfly.ca>.
On Friday, June 27, 2014, boci <bo...@gmail.com> wrote:

> Thanks, more local thread solve the problem, it's work like a charm. How
> many thread required?
>
Just more than one so that it can schedule the other task :)

>
> Adrian: it's not public project but ask, and I will answer (if I can)...
> maybe later I will create a demo project based on my solution.
>
> b0c1
>

>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.boci@gmail.com
> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>
>
>
> On Fri, Jun 27, 2014 at 11:31 PM, Holden Karau <holden@pigscanfly.ca
> <javascript:_e(%7B%7D,'cvml','holden@pigscanfly.ca');>> wrote:
>
>> Try setting the master to local[4]
>>
>>
>> On Fri, Jun 27, 2014 at 2:17 PM, boci <boci.boci@gmail.com
>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>> wrote:
>>
>>> This is a simply scalatest. I start a SparkConf, set the master to local
>>> (set the serializer etc), pull up kafka and es connection send a message to
>>> kafka and wait 30sec to processing.
>>>
>>> It's run in IDEA no magick trick.
>>>
>>> b0c1
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>
>>>
>>>
>>> On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <holden@pigscanfly.ca
>>> <javascript:_e(%7B%7D,'cvml','holden@pigscanfly.ca');>> wrote:
>>>
>>>> So a few quick questions:
>>>>
>>>> 1) What cluster are you running this against? Is it just local? Have
>>>> you tried local[4]?
>>>> 2) When you say breakpoint, how are you setting this break point? There
>>>> is a good chance your breakpoint mechanism doesn't work in a distributed
>>>> environment, could you instead cause a side effect (like writing to a file)?
>>>>
>>>> Cheers,
>>>>
>>>> Holden :)
>>>>
>>>>
>>>> On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.boci@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>> wrote:
>>>>
>>>>> Ok I found dynamic resources, but I have a frustrating problem. This
>>>>> is the flow:
>>>>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>>>>
>>>>> My problem is: if I do this it's not work, the enrich functions not
>>>>> called, but if I put a print it's does. for example if I do this:
>>>>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>>>>
>>>>> The enrich X and enrich Y called but enrich Z not
>>>>> if I put the print after the enrich Z it's will be printed. How can I
>>>>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>>>>> the map function (where I'm generate the writable) but it's not called)
>>>>>
>>>>> Any idea?
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>
>>>>>
>>>>>
>>>>> On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.boci@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>> wrote:
>>>>>
>>>>>> Another question. In the foreachRDD I will initialize the JobConf,
>>>>>> but in this place how can I get information from the items?
>>>>>> I have an identifier in the data which identify the required ES index
>>>>>> (so how can I set dynamic index in the foreachRDD) ?
>>>>>>
>>>>>> b0c1
>>>>>>
>>>>>>
>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <holden@pigscanfly.ca
>>>>>> <javascript:_e(%7B%7D,'cvml','holden@pigscanfly.ca');>> wrote:
>>>>>>
>>>>>>> Just your luck I happened to be working on that very talk today :)
>>>>>>> Let me know how your experiences with Elasticsearch & Spark go :)
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.boci@gmail.com
>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>> wrote:
>>>>>>>
>>>>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>>>>
>>>>>>>> b0c1
>>>>>>>>
>>>>>>>>
>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <
>>>>>>>> holden@pigscanfly.ca
>>>>>>>> <javascript:_e(%7B%7D,'cvml','holden@pigscanfly.ca');>> wrote:
>>>>>>>>
>>>>>>>>> Hi b0c1,
>>>>>>>>>
>>>>>>>>> I have an example of how to do this in the repo for my talk as
>>>>>>>>> well, the specific example is at
>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>>>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>>>>>> function we provide to foreachRDD.
>>>>>>>>>
>>>>>>>>> e.g.
>>>>>>>>>
>>>>>>>>> stream.foreachRDD{(data, time) =>
>>>>>>>>>      val jobconf = ...
>>>>>>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Hope that helps :)
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Holden :)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.boci@gmail.com
>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks. I without local option I can connect with es remote, now
>>>>>>>>>> I only have one problem. How can I use elasticsearch-hadoop with spark
>>>>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>>>>>>> second problem the output index is depend by the input data.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>>>>>>> nick.pentreath@gmail.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','nick.pentreath@gmail.com');>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your
>>>>>>>>>>> project to user the ESInputFormat and ESOutputFormat (
>>>>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some
>>>>>>>>>>> other basics here:
>>>>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>>>>>>
>>>>>>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost,
>>>>>>>>>>> port = 9200).
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.boci@gmail.com
>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I
>>>>>>>>>>>> run saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>>>>>>>> programatically? (if I can))
>>>>>>>>>>>>
>>>>>>>>>>>> b0c1
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <
>>>>>>>>>>>> holden@pigscanfly.ca
>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','holden@pigscanfly.ca');>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.boci@gmail.com
>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi guys, thanks the direction now I have some
>>>>>>>>>>>>>> problem/question:
>>>>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to
>>>>>>>>>>>>>> create es connection, but in prodution I want to use ElasticClient.remote,
>>>>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or
>>>>>>>>>>>>>> what is the best practices?
>>>>>>>>>>>>>>
>>>>>>>>>>>>> In this case you probably want to make the ElasticClient
>>>>>>>>>>>>> inside of mapPartitions (since it isn't serializable) and if you want to
>>>>>>>>>>>>> use a different client in local mode just have a flag that control what
>>>>>>>>>>>>> type of client you create.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>>>>>>
>>>>>>>>>>>>> I think the simplest thing to do would be use the same client
>>>>>>>>>>>>> in mode and just start single node elastic search cluster.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks guys
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> b0c1
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.boci@gmail.com');>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>>>>>>> holden@pigscanfly.ca
>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','holden@pigscanfly.ca');>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This approach might not work for your data, e.g. if you need
>>>>>>>>>>>>>>> to create a query for each record in your RDD. If this is the case, you
>>>>>>>>>>>>>>> could instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>>>>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>>>>>>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>>>>>>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Holden :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>>>>>>> mayur.rustagi@gmail.com
>>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','mayur.rustagi@gmail.com');>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Which part are you getting as nonserializable... you need
>>>>>>>>>>>>>>>> to serialize that class if you are sending it to spark workers inside a
>>>>>>>>>>>>>>>> map, reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <
>>>>>>>>>>>>>>>> pc175@uow.edu.au
>>>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','pc175@uow.edu.au');>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine.
>>>>>>>>>>>>>>>>> Your ES server may
>>>>>>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think its possible to invoke a static method that give
>>>>>>>>>>>>>>>>> you a connection in
>>>>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure,
>>>>>>>>>>>>>>>>> but its too complex
>>>>>>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should
>>>>>>>>>>>>>>>>> use it as the
>>>>>>>>>>>>>>>>> default serializer
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive
>>>>>>>>>>>>>>>>> at Nabble.com.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cell : 425-233-8271
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>>
>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>>
>
>

-- 
Cell : 425-233-8271

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
Thanks, more local thread solve the problem, it's work like a charm. How
many thread required?
Adrian: it's not public project but ask, and I will answer (if I can)...
maybe later I will create a demo project based on my solution.

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Fri, Jun 27, 2014 at 11:31 PM, Holden Karau <ho...@pigscanfly.ca> wrote:

> Try setting the master to local[4]
>
>
> On Fri, Jun 27, 2014 at 2:17 PM, boci <bo...@gmail.com> wrote:
>
>> This is a simply scalatest. I start a SparkConf, set the master to local
>> (set the serializer etc), pull up kafka and es connection send a message to
>> kafka and wait 30sec to processing.
>>
>> It's run in IDEA no magick trick.
>>
>> b0c1
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.boci@gmail.com
>>
>>
>> On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> So a few quick questions:
>>>
>>> 1) What cluster are you running this against? Is it just local? Have you
>>> tried local[4]?
>>> 2) When you say breakpoint, how are you setting this break point? There
>>> is a good chance your breakpoint mechanism doesn't work in a distributed
>>> environment, could you instead cause a side effect (like writing to a file)?
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>>
>>> On Fri, Jun 27, 2014 at 2:04 PM, boci <bo...@gmail.com> wrote:
>>>
>>>> Ok I found dynamic resources, but I have a frustrating problem. This is
>>>> the flow:
>>>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>>>
>>>> My problem is: if I do this it's not work, the enrich functions not
>>>> called, but if I put a print it's does. for example if I do this:
>>>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>>>
>>>> The enrich X and enrich Y called but enrich Z not
>>>> if I put the print after the enrich Z it's will be printed. How can I
>>>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>>>> the map function (where I'm generate the writable) but it's not called)
>>>>
>>>> Any idea?
>>>>
>>>> b0c1
>>>>
>>>>
>>>>
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>
>>>>
>>>> On Fri, Jun 27, 2014 at 4:53 PM, boci <bo...@gmail.com> wrote:
>>>>
>>>>> Another question. In the foreachRDD I will initialize the JobConf, but
>>>>> in this place how can I get information from the items?
>>>>> I have an identifier in the data which identify the required ES index
>>>>> (so how can I set dynamic index in the foreachRDD) ?
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>
>>>>>
>>>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> Just your luck I happened to be working on that very talk today :)
>>>>>> Let me know how your experiences with Elasticsearch & Spark go :)
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <bo...@gmail.com> wrote:
>>>>>>
>>>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>>>
>>>>>>> b0c1
>>>>>>>
>>>>>>>
>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <holden@pigscanfly.ca
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi b0c1,
>>>>>>>>
>>>>>>>> I have an example of how to do this in the repo for my talk as
>>>>>>>> well, the specific example is at
>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>>>>> function we provide to foreachRDD.
>>>>>>>>
>>>>>>>> e.g.
>>>>>>>>
>>>>>>>> stream.foreachRDD{(data, time) =>
>>>>>>>>      val jobconf = ...
>>>>>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>>>>> }
>>>>>>>>
>>>>>>>> Hope that helps :)
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Holden :)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>>>>>> second problem the output index is depend by the input data.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your
>>>>>>>>>> project to user the ESInputFormat and ESOutputFormat (
>>>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some
>>>>>>>>>> other basics here:
>>>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>>>>>
>>>>>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost,
>>>>>>>>>> port = 9200).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I
>>>>>>>>>>> run saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>>>>>>> programatically? (if I can))
>>>>>>>>>>>
>>>>>>>>>>> b0c1
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <
>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to
>>>>>>>>>>>>> create es connection, but in prodution I want to use ElasticClient.remote,
>>>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or
>>>>>>>>>>>>> what is the best practices?
>>>>>>>>>>>>>
>>>>>>>>>>>> In this case you probably want to make the ElasticClient inside
>>>>>>>>>>>> of mapPartitions (since it isn't serializable) and if you want to use a
>>>>>>>>>>>> different client in local mode just have a flag that control what type of
>>>>>>>>>>>> client you create.
>>>>>>>>>>>>
>>>>>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>>>>>
>>>>>>>>>>>> I think the simplest thing to do would be use the same client
>>>>>>>>>>>> in mode and just start single node elastic search cluster.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks guys
>>>>>>>>>>>>>
>>>>>>>>>>>>> b0c1
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This approach might not work for your data, e.g. if you need
>>>>>>>>>>>>>> to create a query for each record in your RDD. If this is the case, you
>>>>>>>>>>>>>> could instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>>>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>>>>>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>>>>>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Holden :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <
>>>>>>>>>>>>>>> pc175@uow.edu.au> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine.
>>>>>>>>>>>>>>>> Your ES server may
>>>>>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think its possible to invoke a static method that give
>>>>>>>>>>>>>>>> you a connection in
>>>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure,
>>>>>>>>>>>>>>>> but its too complex
>>>>>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should
>>>>>>>>>>>>>>>> use it as the
>>>>>>>>>>>>>>>> default serializer
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive
>>>>>>>>>>>>>>>> at Nabble.com.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cell : 425-233-8271
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cell : 425-233-8271
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
>

Re: ElasticSearch enrich

Posted by Holden Karau <ho...@pigscanfly.ca>.
Try setting the master to local[4]


On Fri, Jun 27, 2014 at 2:17 PM, boci <bo...@gmail.com> wrote:

> This is a simply scalatest. I start a SparkConf, set the master to local
> (set the serializer etc), pull up kafka and es connection send a message to
> kafka and wait 30sec to processing.
>
> It's run in IDEA no magick trick.
>
> b0c1
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.boci@gmail.com
>
>
> On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <ho...@pigscanfly.ca>
> wrote:
>
>> So a few quick questions:
>>
>> 1) What cluster are you running this against? Is it just local? Have you
>> tried local[4]?
>> 2) When you say breakpoint, how are you setting this break point? There
>> is a good chance your breakpoint mechanism doesn't work in a distributed
>> environment, could you instead cause a side effect (like writing to a file)?
>>
>> Cheers,
>>
>> Holden :)
>>
>>
>> On Fri, Jun 27, 2014 at 2:04 PM, boci <bo...@gmail.com> wrote:
>>
>>> Ok I found dynamic resources, but I have a frustrating problem. This is
>>> the flow:
>>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>>
>>> My problem is: if I do this it's not work, the enrich functions not
>>> called, but if I put a print it's does. for example if I do this:
>>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>>
>>> The enrich X and enrich Y called but enrich Z not
>>> if I put the print after the enrich Z it's will be printed. How can I
>>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>>> the map function (where I'm generate the writable) but it's not called)
>>>
>>> Any idea?
>>>
>>> b0c1
>>>
>>>
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>
>>>
>>> On Fri, Jun 27, 2014 at 4:53 PM, boci <bo...@gmail.com> wrote:
>>>
>>>> Another question. In the foreachRDD I will initialize the JobConf, but
>>>> in this place how can I get information from the items?
>>>> I have an identifier in the data which identify the required ES index
>>>> (so how can I set dynamic index in the foreachRDD) ?
>>>>
>>>> b0c1
>>>>
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>
>>>>
>>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> Just your luck I happened to be working on that very talk today :) Let
>>>>> me know how your experiences with Elasticsearch & Spark go :)
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <bo...@gmail.com> wrote:
>>>>>
>>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>>
>>>>>> b0c1
>>>>>>
>>>>>>
>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi b0c1,
>>>>>>>
>>>>>>> I have an example of how to do this in the repo for my talk as well,
>>>>>>> the specific example is at
>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>>>> function we provide to foreachRDD.
>>>>>>>
>>>>>>> e.g.
>>>>>>>
>>>>>>> stream.foreachRDD{(data, time) =>
>>>>>>>      val jobconf = ...
>>>>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>>>> }
>>>>>>>
>>>>>>> Hope that helps :)
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Holden :)
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>>>>> second problem the output index is depend by the input data.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your
>>>>>>>>> project to user the ESInputFormat and ESOutputFormat (
>>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some
>>>>>>>>> other basics here:
>>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>>>>
>>>>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost,
>>>>>>>>> port = 9200).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I
>>>>>>>>>> run saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>>>>>> programatically? (if I can))
>>>>>>>>>>
>>>>>>>>>> b0c1
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <
>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to
>>>>>>>>>>>> create es connection, but in prodution I want to use ElasticClient.remote,
>>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or what
>>>>>>>>>>>> is the best practices?
>>>>>>>>>>>>
>>>>>>>>>>> In this case you probably want to make the ElasticClient inside
>>>>>>>>>>> of mapPartitions (since it isn't serializable) and if you want to use a
>>>>>>>>>>> different client in local mode just have a flag that control what type of
>>>>>>>>>>> client you create.
>>>>>>>>>>>
>>>>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>>>>
>>>>>>>>>>> I think the simplest thing to do would be use the same client in
>>>>>>>>>>> mode and just start single node elastic search cluster.
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks guys
>>>>>>>>>>>>
>>>>>>>>>>>> b0c1
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This approach might not work for your data, e.g. if you need
>>>>>>>>>>>>> to create a query for each record in your RDD. If this is the case, you
>>>>>>>>>>>>> could instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>>>>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>>>>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Holden :)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc175@uow.edu.au
>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your
>>>>>>>>>>>>>>> ES server may
>>>>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think its possible to invoke a static method that give you
>>>>>>>>>>>>>>> a connection in
>>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but
>>>>>>>>>>>>>>> its too complex
>>>>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should
>>>>>>>>>>>>>>> use it as the
>>>>>>>>>>>>>>> default serializer
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cell : 425-233-8271
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cell : 425-233-8271
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>>
>
>


-- 
Cell : 425-233-8271

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
This is a simply scalatest. I start a SparkConf, set the master to local
(set the serializer etc), pull up kafka and es connection send a message to
kafka and wait 30sec to processing.

It's run in IDEA no magick trick.

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <ho...@pigscanfly.ca> wrote:

> So a few quick questions:
>
> 1) What cluster are you running this against? Is it just local? Have you
> tried local[4]?
> 2) When you say breakpoint, how are you setting this break point? There is
> a good chance your breakpoint mechanism doesn't work in a distributed
> environment, could you instead cause a side effect (like writing to a file)?
>
> Cheers,
>
> Holden :)
>
>
> On Fri, Jun 27, 2014 at 2:04 PM, boci <bo...@gmail.com> wrote:
>
>> Ok I found dynamic resources, but I have a frustrating problem. This is
>> the flow:
>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>
>> My problem is: if I do this it's not work, the enrich functions not
>> called, but if I put a print it's does. for example if I do this:
>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>
>> The enrich X and enrich Y called but enrich Z not
>> if I put the print after the enrich Z it's will be printed. How can I
>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>> the map function (where I'm generate the writable) but it's not called)
>>
>> Any idea?
>>
>> b0c1
>>
>>
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.boci@gmail.com
>>
>>
>> On Fri, Jun 27, 2014 at 4:53 PM, boci <bo...@gmail.com> wrote:
>>
>>> Another question. In the foreachRDD I will initialize the JobConf, but
>>> in this place how can I get information from the items?
>>> I have an identifier in the data which identify the required ES index
>>> (so how can I set dynamic index in the foreachRDD) ?
>>>
>>> b0c1
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>
>>>
>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> Just your luck I happened to be working on that very talk today :) Let
>>>> me know how your experiences with Elasticsearch & Spark go :)
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <bo...@gmail.com> wrote:
>>>>
>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> Hi b0c1,
>>>>>>
>>>>>> I have an example of how to do this in the repo for my talk as well,
>>>>>> the specific example is at
>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>>> function we provide to foreachRDD.
>>>>>>
>>>>>> e.g.
>>>>>>
>>>>>> stream.foreachRDD{(data, time) =>
>>>>>>      val jobconf = ...
>>>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>>> }
>>>>>>
>>>>>> Hope that helps :)
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Holden :)
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>>>> second problem the output index is depend by the input data.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>
>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your
>>>>>>>> project to user the ESInputFormat and ESOutputFormat (
>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>>>>>> basics here:
>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>>>
>>>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost,
>>>>>>>> port = 9200).
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>>>>> programatically? (if I can))
>>>>>>>>>
>>>>>>>>> b0c1
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <
>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to
>>>>>>>>>>> create es connection, but in prodution I want to use ElasticClient.remote,
>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or what
>>>>>>>>>>> is the best practices?
>>>>>>>>>>>
>>>>>>>>>> In this case you probably want to make the ElasticClient inside
>>>>>>>>>> of mapPartitions (since it isn't serializable) and if you want to use a
>>>>>>>>>> different client in local mode just have a flag that control what type of
>>>>>>>>>> client you create.
>>>>>>>>>>
>>>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>>>
>>>>>>>>>> I think the simplest thing to do would be use the same client in
>>>>>>>>>> mode and just start single node elastic search cluster.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks guys
>>>>>>>>>>>
>>>>>>>>>>> b0c1
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>>>
>>>>>>>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>>>>>>>> create a query for each record in your RDD. If this is the case, you could
>>>>>>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>>>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>>>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>>>>>>>
>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>
>>>>>>>>>>>> Holden :)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>>>
>>>>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your
>>>>>>>>>>>>>> ES server may
>>>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think its possible to invoke a static method that give you
>>>>>>>>>>>>>> a connection in
>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but
>>>>>>>>>>>>>> its too complex
>>>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use
>>>>>>>>>>>>>> it as the
>>>>>>>>>>>>>> default serializer
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cell : 425-233-8271
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>>
>>>
>>>
>>
>
>
> --
> Cell : 425-233-8271
>

Re: ElasticSearch enrich

Posted by Holden Karau <ho...@pigscanfly.ca>.
So a few quick questions:

1) What cluster are you running this against? Is it just local? Have you
tried local[4]?
2) When you say breakpoint, how are you setting this break point? There is
a good chance your breakpoint mechanism doesn't work in a distributed
environment, could you instead cause a side effect (like writing to a file)?

Cheers,

Holden :)


On Fri, Jun 27, 2014 at 2:04 PM, boci <bo...@gmail.com> wrote:

> Ok I found dynamic resources, but I have a frustrating problem. This is
> the flow:
> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>
> My problem is: if I do this it's not work, the enrich functions not
> called, but if I put a print it's does. for example if I do this:
> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>
> The enrich X and enrich Y called but enrich Z not
> if I put the print after the enrich Z it's will be printed. How can I
> solve this? (what can I do to call the foreachRDD I put breakpoint inside
> the map function (where I'm generate the writable) but it's not called)
>
> Any idea?
>
> b0c1
>
>
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.boci@gmail.com
>
>
> On Fri, Jun 27, 2014 at 4:53 PM, boci <bo...@gmail.com> wrote:
>
>> Another question. In the foreachRDD I will initialize the JobConf, but in
>> this place how can I get information from the items?
>> I have an identifier in the data which identify the required ES index (so
>> how can I set dynamic index in the foreachRDD) ?
>>
>> b0c1
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.boci@gmail.com
>>
>>
>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> Just your luck I happened to be working on that very talk today :) Let
>>> me know how your experiences with Elasticsearch & Spark go :)
>>>
>>>
>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <bo...@gmail.com> wrote:
>>>
>>>> Wow, thanks your fast answer, it's help a lot...
>>>>
>>>> b0c1
>>>>
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> Hi b0c1,
>>>>>
>>>>> I have an example of how to do this in the repo for my talk as well,
>>>>> the specific example is at
>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>> function we provide to foreachRDD.
>>>>>
>>>>> e.g.
>>>>>
>>>>> stream.foreachRDD{(data, time) =>
>>>>>      val jobconf = ...
>>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>> }
>>>>>
>>>>> Hope that helps :)
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Holden :)
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com> wrote:
>>>>>
>>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>>> second problem the output index is depend by the input data.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>
>>>>>>> You can just add elasticsearch-hadoop as a dependency to your
>>>>>>> project to user the ESInputFormat and ESOutputFormat (
>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>>>>> basics here:
>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>>
>>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost,
>>>>>>> port = 9200).
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>>>> programatically? (if I can))
>>>>>>>>
>>>>>>>> b0c1
>>>>>>>>
>>>>>>>>
>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <holden@pigscanfly.ca
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to
>>>>>>>>>> create es connection, but in prodution I want to use ElasticClient.remote,
>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or what
>>>>>>>>>> is the best practices?
>>>>>>>>>>
>>>>>>>>> In this case you probably want to make the ElasticClient inside of
>>>>>>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>>>>>>> different client in local mode just have a flag that control what type of
>>>>>>>>> client you create.
>>>>>>>>>
>>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>>
>>>>>>>>> I think the simplest thing to do would be use the same client in
>>>>>>>>> mode and just start single node elastic search cluster.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks guys
>>>>>>>>>>
>>>>>>>>>> b0c1
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>>
>>>>>>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>>>>>>> create a query for each record in your RDD. If this is the case, you could
>>>>>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>>>>>>
>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>>
>>>>>>>>>>> Holden :)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>>
>>>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your
>>>>>>>>>>>>> ES server may
>>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>>>>>>>> connection in
>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but
>>>>>>>>>>>>> its too complex
>>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use
>>>>>>>>>>>>> it as the
>>>>>>>>>>>>> default serializer
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cell : 425-233-8271
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>>
>>
>>
>


-- 
Cell : 425-233-8271

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
Ok I found dynamic resources, but I have a frustrating problem. This is the
flow:
kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save

My problem is: if I do this it's not work, the enrich functions not called,
but if I put a print it's does. for example if I do this:
kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD

The enrich X and enrich Y called but enrich Z not
if I put the print after the enrich Z it's will be printed. How can I solve
this? (what can I do to call the foreachRDD I put breakpoint inside the map
function (where I'm generate the writable) but it's not called)

Any idea?

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Fri, Jun 27, 2014 at 4:53 PM, boci <bo...@gmail.com> wrote:

> Another question. In the foreachRDD I will initialize the JobConf, but in
> this place how can I get information from the items?
> I have an identifier in the data which identify the required ES index (so
> how can I set dynamic index in the foreachRDD) ?
>
> b0c1
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.boci@gmail.com
>
>
> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <ho...@pigscanfly.ca>
> wrote:
>
>> Just your luck I happened to be working on that very talk today :) Let me
>> know how your experiences with Elasticsearch & Spark go :)
>>
>>
>> On Thu, Jun 26, 2014 at 3:17 PM, boci <bo...@gmail.com> wrote:
>>
>>> Wow, thanks your fast answer, it's help a lot...
>>>
>>> b0c1
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>
>>>
>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> Hi b0c1,
>>>>
>>>> I have an example of how to do this in the repo for my talk as well,
>>>> the specific example is at
>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>> function we provide to foreachRDD.
>>>>
>>>> e.g.
>>>>
>>>> stream.foreachRDD{(data, time) =>
>>>>      val jobconf = ...
>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>> }
>>>>
>>>> Hope that helps :)
>>>>
>>>> Cheers,
>>>>
>>>> Holden :)
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com> wrote:
>>>>
>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>> second problem the output index is depend by the input data.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>> nick.pentreath@gmail.com> wrote:
>>>>>
>>>>>> You can just add elasticsearch-hadoop as a dependency to your project
>>>>>> to user the ESInputFormat and ESOutputFormat (
>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>>>> basics here:
>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>
>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost,
>>>>>> port = 9200).
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:
>>>>>>
>>>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>>> programatically? (if I can))
>>>>>>>
>>>>>>> b0c1
>>>>>>>
>>>>>>>
>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to create
>>>>>>>>> es connection, but in prodution I want to use ElasticClient.remote, to this
>>>>>>>>> I want to pass ElasticClient to mapPartitions, or what is the
>>>>>>>>> best practices?
>>>>>>>>>
>>>>>>>> In this case you probably want to make the ElasticClient inside of
>>>>>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>>>>>> different client in local mode just have a flag that control what type of
>>>>>>>> client you create.
>>>>>>>>
>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>>>>
>>>>>>>>>
>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>
>>>>>>>> I think the simplest thing to do would be use the same client in
>>>>>>>> mode and just start single node elastic search cluster.
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks guys
>>>>>>>>>
>>>>>>>>> b0c1
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>
>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>
>>>>>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>>>>>> create a query for each record in your RDD. If this is the case, you could
>>>>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>>>>>
>>>>>>>>>> Hope this helps!
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>>
>>>>>>>>>> Holden :)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>
>>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>>>>>>>> server may
>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>
>>>>>>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>>>>>>> connection in
>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but
>>>>>>>>>>>> its too complex
>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>
>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use
>>>>>>>>>>>> it as the
>>>>>>>>>>>> default serializer
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cell : 425-233-8271
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>>
>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>>
>
>

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
Another question. In the foreachRDD I will initialize the JobConf, but in
this place how can I get information from the items?
I have an identifier in the data which identify the required ES index (so
how can I set dynamic index in the foreachRDD) ?

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <ho...@pigscanfly.ca> wrote:

> Just your luck I happened to be working on that very talk today :) Let me
> know how your experiences with Elasticsearch & Spark go :)
>
>
> On Thu, Jun 26, 2014 at 3:17 PM, boci <bo...@gmail.com> wrote:
>
>> Wow, thanks your fast answer, it's help a lot...
>>
>> b0c1
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.boci@gmail.com
>>
>>
>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> Hi b0c1,
>>>
>>> I have an example of how to do this in the repo for my talk as well, the
>>> specific example is at
>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>> function we provide to foreachRDD.
>>>
>>> e.g.
>>>
>>> stream.foreachRDD{(data, time) =>
>>>      val jobconf = ...
>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>> }
>>>
>>> Hope that helps :)
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>>
>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com> wrote:
>>>
>>>> Thanks. I without local option I can connect with es remote, now I only
>>>> have one problem. How can I use elasticsearch-hadoop with spark streaming?
>>>> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
>>>> the output index is depend by the input data.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>> nick.pentreath@gmail.com> wrote:
>>>>
>>>>> You can just add elasticsearch-hadoop as a dependency to your project
>>>>> to user the ESInputFormat and ESOutputFormat (
>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>>> basics here:
>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>
>>>>> For testing, yes I think you will need to start ES in local mode (just
>>>>> ./bin/elasticsearch) and use the default config (host = localhost, port =
>>>>> 9200).
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:
>>>>>
>>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>> programatically? (if I can))
>>>>>>
>>>>>> b0c1
>>>>>>
>>>>>>
>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>> - in local (test) mode I want to use ElasticClient.local to create
>>>>>>>> es connection, but in prodution I want to use ElasticClient.remote, to this
>>>>>>>> I want to pass ElasticClient to mapPartitions, or what is the best
>>>>>>>> practices?
>>>>>>>>
>>>>>>> In this case you probably want to make the ElasticClient inside of
>>>>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>>>>> different client in local mode just have a flag that control what type of
>>>>>>> client you create.
>>>>>>>
>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>>>
>>>>>>>>
>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>
>>>>>>> I think the simplest thing to do would be use the same client in
>>>>>>> mode and just start single node elastic search cluster.
>>>>>>>
>>>>>>>>
>>>>>>>> Thanks guys
>>>>>>>>
>>>>>>>> b0c1
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <holden@pigscanfly.ca
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>
>>>>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>>>>> create a query for each record in your RDD. If this is the case, you could
>>>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>>>>
>>>>>>>>> Hope this helps!
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Holden :)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>
>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Mayur Rustagi
>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm afraid persisting connection across two tasks is a dangerous
>>>>>>>>>>> act as they
>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>>>>>>> server may
>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>
>>>>>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>>>>>> connection in
>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but its
>>>>>>>>>>> too complex
>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>
>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use it
>>>>>>>>>>> as the
>>>>>>>>>>> default serializer
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cell : 425-233-8271
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
>

Re: ElasticSearch enrich

Posted by Holden Karau <ho...@pigscanfly.ca>.
Just your luck I happened to be working on that very talk today :) Let me
know how your experiences with Elasticsearch & Spark go :)


On Thu, Jun 26, 2014 at 3:17 PM, boci <bo...@gmail.com> wrote:

> Wow, thanks your fast answer, it's help a lot...
>
> b0c1
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.boci@gmail.com
>
>
> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <ho...@pigscanfly.ca>
> wrote:
>
>> Hi b0c1,
>>
>> I have an example of how to do this in the repo for my talk as well, the
>> specific example is at
>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>> function we provide to foreachRDD.
>>
>> e.g.
>>
>> stream.foreachRDD{(data, time) =>
>>      val jobconf = ...
>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>> }
>>
>> Hope that helps :)
>>
>> Cheers,
>>
>> Holden :)
>>
>>
>> On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com> wrote:
>>
>>> Thanks. I without local option I can connect with es remote, now I only
>>> have one problem. How can I use elasticsearch-hadoop with spark streaming?
>>> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
>>> the output index is depend by the input data.
>>>
>>> Thanks
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>
>>>
>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>> nick.pentreath@gmail.com> wrote:
>>>
>>>> You can just add elasticsearch-hadoop as a dependency to your project
>>>> to user the ESInputFormat and ESOutputFormat (
>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>> basics here:
>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>
>>>> For testing, yes I think you will need to start ES in local mode (just
>>>> ./bin/elasticsearch) and use the default config (host = localhost, port =
>>>> 9200).
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:
>>>>
>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>> programatically? (if I can))
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>> - in local (test) mode I want to use ElasticClient.local to create
>>>>>>> es connection, but in prodution I want to use ElasticClient.remote, to this
>>>>>>> I want to pass ElasticClient to mapPartitions, or what is the best
>>>>>>> practices?
>>>>>>>
>>>>>> In this case you probably want to make the ElasticClient inside of
>>>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>>>> different client in local mode just have a flag that control what type of
>>>>>> client you create.
>>>>>>
>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>>
>>>>>>>
>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>
>>>>>> I think the simplest thing to do would be use the same client in mode
>>>>>> and just start single node elastic search cluster.
>>>>>>
>>>>>>>
>>>>>>> Thanks guys
>>>>>>>
>>>>>>> b0c1
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>
>>>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>>>> create a query for each record in your RDD. If this is the case, you could
>>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>>>
>>>>>>>> Hope this helps!
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Holden :)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>
>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Mayur Rustagi
>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I'm afraid persisting connection across two tasks is a dangerous
>>>>>>>>>> act as they
>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>>>>>> server may
>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>
>>>>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>>>>> connection in
>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but its
>>>>>>>>>> too complex
>>>>>>>>>> and there should be a better option.
>>>>>>>>>>
>>>>>>>>>> Never use kryo before, if its that good perhaps we should use it
>>>>>>>>>> as the
>>>>>>>>>> default serializer
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> View this message in context:
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>> Nabble.com.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cell : 425-233-8271
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cell : 425-233-8271
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>>
>
>


-- 
Cell : 425-233-8271

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
Wow, thanks your fast answer, it's help a lot...

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <ho...@pigscanfly.ca> wrote:

> Hi b0c1,
>
> I have an example of how to do this in the repo for my talk as well, the
> specific example is at
> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
> then call  saveAsHadoopDataset on the RDD that gets passed into the
> function we provide to foreachRDD.
>
> e.g.
>
> stream.foreachRDD{(data, time) =>
>      val jobconf = ...
>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
> }
>
> Hope that helps :)
>
> Cheers,
>
> Holden :)
>
>
> On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com> wrote:
>
>> Thanks. I without local option I can connect with es remote, now I only
>> have one problem. How can I use elasticsearch-hadoop with spark streaming?
>> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
>> the output index is depend by the input data.
>>
>> Thanks
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.boci@gmail.com
>>
>>
>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>> nick.pentreath@gmail.com> wrote:
>>
>>> You can just add elasticsearch-hadoop as a dependency to your project to
>>> user the ESInputFormat and ESOutputFormat (
>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>> basics here:
>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>
>>> For testing, yes I think you will need to start ES in local mode (just
>>> ./bin/elasticsearch) and use the default config (host = localhost, port =
>>> 9200).
>>>
>>>
>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:
>>>
>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>> programatically? (if I can))
>>>>
>>>> b0c1
>>>>
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:
>>>>>
>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>> - in local (test) mode I want to use ElasticClient.local to create es
>>>>>> connection, but in prodution I want to use ElasticClient.remote, to this I
>>>>>> want to pass ElasticClient to mapPartitions, or what is the best
>>>>>> practices?
>>>>>>
>>>>> In this case you probably want to make the ElasticClient inside of
>>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>>> different client in local mode just have a flag that control what type of
>>>>> client you create.
>>>>>
>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>
>>>>>>
>>>>> - After store the enriched data into ES, I want to generate aggregated
>>>>>> data (EsInputFormat) how can I test it in local?
>>>>>>
>>>>> I think the simplest thing to do would be use the same client in mode
>>>>> and just start single node elastic search cluster.
>>>>>
>>>>>>
>>>>>> Thanks guys
>>>>>>
>>>>>> b0c1
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>> implementation with TopTweetsInALocation (
>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>
>>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>>> create a query for each record in your RDD. If this is the case, you could
>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>>
>>>>>>> Hope this helps!
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Holden :)
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>>>
>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>
>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>
>>>>>>>>
>>>>>>>> Mayur Rustagi
>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I'm afraid persisting connection across two tasks is a dangerous
>>>>>>>>> act as they
>>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>>>>> server may
>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>
>>>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>>>> connection in
>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but its
>>>>>>>>> too complex
>>>>>>>>> and there should be a better option.
>>>>>>>>>
>>>>>>>>> Never use kryo before, if its that good perhaps we should use it
>>>>>>>>> as the
>>>>>>>>> default serializer
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> View this message in context:
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>> Nabble.com.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cell : 425-233-8271
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cell : 425-233-8271
>>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> Cell : 425-233-8271
>

Re: ElasticSearch enrich

Posted by Holden Karau <ho...@pigscanfly.ca>.
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the
specific example is at
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
. Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
then call  saveAsHadoopDataset on the RDD that gets passed into the
function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)


On Thu, Jun 26, 2014 at 2:23 PM, boci <bo...@gmail.com> wrote:

> Thanks. I without local option I can connect with es remote, now I only
> have one problem. How can I use elasticsearch-hadoop with spark streaming?
> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
> the output index is depend by the input data.
>
> Thanks
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.boci@gmail.com
>
>
> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <nick.pentreath@gmail.com
> > wrote:
>
>> You can just add elasticsearch-hadoop as a dependency to your project to
>> user the ESInputFormat and ESOutputFormat (
>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>> basics here:
>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>
>> For testing, yes I think you will need to start ES in local mode (just
>> ./bin/elasticsearch) and use the default config (host = localhost, port =
>> 9200).
>>
>>
>> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:
>>
>>> That's okay, but hadoop has ES integration. what happened if I run
>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>> programatically? (if I can))
>>>
>>> b0c1
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>
>>>
>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:
>>>>
>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>> - in local (test) mode I want to use ElasticClient.local to create es
>>>>> connection, but in prodution I want to use ElasticClient.remote, to this I
>>>>> want to pass ElasticClient to mapPartitions, or what is the best
>>>>> practices?
>>>>>
>>>> In this case you probably want to make the ElasticClient inside of
>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>> different client in local mode just have a flag that control what type of
>>>> client you create.
>>>>
>>>>> - my stream output is write into elasticsearch. How can I
>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>
>>>>>
>>>> - After store the enriched data into ES, I want to generate aggregated
>>>>> data (EsInputFormat) how can I test it in local?
>>>>>
>>>> I think the simplest thing to do would be use the same client in mode
>>>> and just start single node elastic search cluster.
>>>>
>>>>>
>>>>> Thanks guys
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>>
>>>>>
>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>> implementation with TopTweetsInALocation (
>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>> having to manually create ElasticSearch clients.
>>>>>>
>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>> create a query for each record in your RDD. If this is the case, you could
>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>> the queries on each partition. This approach will avoid having to serialize
>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>
>>>>>> Hope this helps!
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Holden :)
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>>
>>>>>>> Its not used as default serializer for some issues with
>>>>>>> compatibility & requirement to register the classes..
>>>>>>>
>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>
>>>>>>>
>>>>>>> Mayur Rustagi
>>>>>>> Ph: +1 (760) 203 3257
>>>>>>> http://www.sigmoidanalytics.com
>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm afraid persisting connection across two tasks is a dangerous
>>>>>>>> act as they
>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>>>> server may
>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>
>>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>>> connection in
>>>>>>>> a local 'pool', so nothing will sneak into your closure, but its
>>>>>>>> too complex
>>>>>>>> and there should be a better option.
>>>>>>>>
>>>>>>>> Never use kryo before, if its that good perhaps we should use it as
>>>>>>>> the
>>>>>>>> default serializer
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>> Nabble.com.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cell : 425-233-8271
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>>
>>>
>>>
>>
>


-- 
Cell : 425-233-8271

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
Thanks. I without local option I can connect with es remote, now I only
have one problem. How can I use elasticsearch-hadoop with spark streaming?
I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <ni...@gmail.com>
wrote:

> You can just add elasticsearch-hadoop as a dependency to your project to
> user the ESInputFormat and ESOutputFormat (
> https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics
> here:
> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>
> For testing, yes I think you will need to start ES in local mode (just
> ./bin/elasticsearch) and use the default config (host = localhost, port =
> 9200).
>
>
> On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:
>
>> That's okay, but hadoop has ES integration. what happened if I run
>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>> programatically? (if I can))
>>
>> b0c1
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.boci@gmail.com
>>
>>
>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>>
>>>
>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:
>>>
>>>> Hi guys, thanks the direction now I have some problem/question:
>>>> - in local (test) mode I want to use ElasticClient.local to create es
>>>> connection, but in prodution I want to use ElasticClient.remote, to this I
>>>> want to pass ElasticClient to mapPartitions, or what is the best
>>>> practices?
>>>>
>>> In this case you probably want to make the ElasticClient inside of
>>> mapPartitions (since it isn't serializable) and if you want to use a
>>> different client in local mode just have a flag that control what type of
>>> client you create.
>>>
>>>> - my stream output is write into elasticsearch. How can I
>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>
>>>>
>>> - After store the enriched data into ES, I want to generate aggregated
>>>> data (EsInputFormat) how can I test it in local?
>>>>
>>> I think the simplest thing to do would be use the same client in mode
>>> and just start single node elastic search cluster.
>>>
>>>>
>>>> Thanks guys
>>>>
>>>> b0c1
>>>>
>>>>
>>>>
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>>
>>>>
>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>> implementation with TopTweetsInALocation (
>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>> having to manually create ElasticSearch clients.
>>>>>
>>>>> This approach might not work for your data, e.g. if you need to create
>>>>> a query for each record in your RDD. If this is the case, you could instead
>>>>> look at using mapPartitions and setting up your Elasticsearch connection
>>>>> inside of that, so you could then re-use the client for all of the queries
>>>>> on each partition. This approach will avoid having to serialize the
>>>>> Elasticsearch connection because it will be local to your function.
>>>>>
>>>>> Hope this helps!
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Holden :)
>>>>>
>>>>>
>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>
>>>>>> Its not used as default serializer for some issues with compatibility
>>>>>> & requirement to register the classes..
>>>>>>
>>>>>> Which part are you getting as nonserializable... you need to
>>>>>> serialize that class if you are sending it to spark workers inside a map,
>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>
>>>>>>
>>>>>> Mayur Rustagi
>>>>>> Ph: +1 (760) 203 3257
>>>>>> http://www.sigmoidanalytics.com
>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> wrote:
>>>>>>
>>>>>>> I'm afraid persisting connection across two tasks is a dangerous act
>>>>>>> as they
>>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>>> server may
>>>>>>> think its a man-in-the-middle attack!
>>>>>>>
>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>> connection in
>>>>>>> a local 'pool', so nothing will sneak into your closure, but its too
>>>>>>> complex
>>>>>>> and there should be a better option.
>>>>>>>
>>>>>>> Never use kryo before, if its that good perhaps we should use it as
>>>>>>> the
>>>>>>> default serializer
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cell : 425-233-8271
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>>
>>
>>
>

Re: ElasticSearch enrich

Posted by Nick Pentreath <ni...@gmail.com>.
You can just add elasticsearch-hadoop as a dependency to your project to
user the ESInputFormat and ESOutputFormat (
https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics
here:
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just
./bin/elasticsearch) and use the default config (host = localhost, port =
9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci <bo...@gmail.com> wrote:

> That's okay, but hadoop has ES integration. what happened if I run
> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
> programatically? (if I can))
>
> b0c1
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.boci@gmail.com
>
>
> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <ho...@pigscanfly.ca>
> wrote:
>
>>
>>
>> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:
>>
>>> Hi guys, thanks the direction now I have some problem/question:
>>> - in local (test) mode I want to use ElasticClient.local to create es
>>> connection, but in prodution I want to use ElasticClient.remote, to this I
>>> want to pass ElasticClient to mapPartitions, or what is the best
>>> practices?
>>>
>> In this case you probably want to make the ElasticClient inside of
>> mapPartitions (since it isn't serializable) and if you want to use a
>> different client in local mode just have a flag that control what type of
>> client you create.
>>
>>> - my stream output is write into elasticsearch. How can I
>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>
>> - After store the enriched data into ES, I want to generate aggregated
>>> data (EsInputFormat) how can I test it in local?
>>>
>> I think the simplest thing to do would be use the same client in mode and
>> just start single node elastic search cluster.
>>
>>>
>>> Thanks guys
>>>
>>> b0c1
>>>
>>>
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.boci@gmail.com
>>>
>>>
>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>> implementation with TopTweetsInALocation (
>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>> having to manually create ElasticSearch clients.
>>>>
>>>> This approach might not work for your data, e.g. if you need to create
>>>> a query for each record in your RDD. If this is the case, you could instead
>>>> look at using mapPartitions and setting up your Elasticsearch connection
>>>> inside of that, so you could then re-use the client for all of the queries
>>>> on each partition. This approach will avoid having to serialize the
>>>> Elasticsearch connection because it will be local to your function.
>>>>
>>>> Hope this helps!
>>>>
>>>> Cheers,
>>>>
>>>> Holden :)
>>>>
>>>>
>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <mayur.rustagi@gmail.com
>>>> > wrote:
>>>>
>>>>> Its not used as default serializer for some issues with compatibility
>>>>> & requirement to register the classes..
>>>>>
>>>>> Which part are you getting as nonserializable... you need to serialize
>>>>> that class if you are sending it to spark workers inside a map, reduce ,
>>>>> mappartition or any of the operations on RDD.
>>>>>
>>>>>
>>>>> Mayur Rustagi
>>>>> Ph: +1 (760) 203 3257
>>>>> http://www.sigmoidanalytics.com
>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> wrote:
>>>>>
>>>>>> I'm afraid persisting connection across two tasks is a dangerous act
>>>>>> as they
>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>> server may
>>>>>> think its a man-in-the-middle attack!
>>>>>>
>>>>>> I think its possible to invoke a static method that give you a
>>>>>> connection in
>>>>>> a local 'pool', so nothing will sneak into your closure, but its too
>>>>>> complex
>>>>>> and there should be a better option.
>>>>>>
>>>>>> Never use kryo before, if its that good perhaps we should use it as
>>>>>> the
>>>>>> default serializer
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>>
>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>>
>
>

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
That's okay, but hadoop has ES integration. what happened if I run
saveAsHadoopFile without hadoop (or I must need to pull up hadoop
programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <ho...@pigscanfly.ca> wrote:

>
>
> On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:
>
>> Hi guys, thanks the direction now I have some problem/question:
>> - in local (test) mode I want to use ElasticClient.local to create es
>> connection, but in prodution I want to use ElasticClient.remote, to this I
>> want to pass ElasticClient to mapPartitions, or what is the best
>> practices?
>>
> In this case you probably want to make the ElasticClient inside of
> mapPartitions (since it isn't serializable) and if you want to use a
> different client in local mode just have a flag that control what type of
> client you create.
>
>> - my stream output is write into elasticsearch. How can I
>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>
> - After store the enriched data into ES, I want to generate aggregated
>> data (EsInputFormat) how can I test it in local?
>>
> I think the simplest thing to do would be use the same client in mode and
> just start single node elastic search cluster.
>
>>
>> Thanks guys
>>
>> b0c1
>>
>>
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.boci@gmail.com
>>
>>
>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> So I'm giving a talk at the Spark summit on using Spark & ElasticSearch,
>>> but for now if you want to see a simple demo which uses elasticsearch for
>>> geo input you can take a look at my quick & dirty implementation with
>>> TopTweetsInALocation (
>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>> having to manually create ElasticSearch clients.
>>>
>>> This approach might not work for your data, e.g. if you need to create a
>>> query for each record in your RDD. If this is the case, you could instead
>>> look at using mapPartitions and setting up your Elasticsearch connection
>>> inside of that, so you could then re-use the client for all of the queries
>>> on each partition. This approach will avoid having to serialize the
>>> Elasticsearch connection because it will be local to your function.
>>>
>>> Hope this helps!
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>>
>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <ma...@gmail.com>
>>> wrote:
>>>
>>>> Its not used as default serializer for some issues with compatibility &
>>>> requirement to register the classes..
>>>>
>>>> Which part are you getting as nonserializable... you need to serialize
>>>> that class if you are sending it to spark workers inside a map, reduce ,
>>>> mappartition or any of the operations on RDD.
>>>>
>>>>
>>>> Mayur Rustagi
>>>> Ph: +1 (760) 203 3257
>>>> http://www.sigmoidanalytics.com
>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>
>>>>
>>>>
>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> wrote:
>>>>
>>>>> I'm afraid persisting connection across two tasks is a dangerous act
>>>>> as they
>>>>> can't be guaranteed to be executed on the same machine. Your ES server
>>>>> may
>>>>> think its a man-in-the-middle attack!
>>>>>
>>>>> I think its possible to invoke a static method that give you a
>>>>> connection in
>>>>> a local 'pool', so nothing will sneak into your closure, but its too
>>>>> complex
>>>>> and there should be a better option.
>>>>>
>>>>> Never use kryo before, if its that good perhaps we should use it as the
>>>>> default serializer
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
>

Re: ElasticSearch enrich

Posted by Holden Karau <ho...@pigscanfly.ca>.
On Wed, Jun 25, 2014 at 4:16 PM, boci <bo...@gmail.com> wrote:

> Hi guys, thanks the direction now I have some problem/question:
> - in local (test) mode I want to use ElasticClient.local to create es
> connection, but in prodution I want to use ElasticClient.remote, to this I
> want to pass ElasticClient to mapPartitions, or what is the best
> practices?
>
In this case you probably want to make the ElasticClient inside of
mapPartitions (since it isn't serializable) and if you want to use a
different client in local mode just have a flag that control what type of
client you create.

> - my stream output is write into elasticsearch. How can I
> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>
- After store the enriched data into ES, I want to generate aggregated data
> (EsInputFormat) how can I test it in local?
>
I think the simplest thing to do would be use the same client in mode and
just start single node elastic search cluster.

>
> Thanks guys
>
> b0c1
>
>
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.boci@gmail.com
>
>
> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <ho...@pigscanfly.ca>
> wrote:
>
>> So I'm giving a talk at the Spark summit on using Spark & ElasticSearch,
>> but for now if you want to see a simple demo which uses elasticsearch for
>> geo input you can take a look at my quick & dirty implementation with
>> TopTweetsInALocation (
>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>> ). This approach uses the ESInputFormat which avoids the difficulty of
>> having to manually create ElasticSearch clients.
>>
>> This approach might not work for your data, e.g. if you need to create a
>> query for each record in your RDD. If this is the case, you could instead
>> look at using mapPartitions and setting up your Elasticsearch connection
>> inside of that, so you could then re-use the client for all of the queries
>> on each partition. This approach will avoid having to serialize the
>> Elasticsearch connection because it will be local to your function.
>>
>> Hope this helps!
>>
>> Cheers,
>>
>> Holden :)
>>
>>
>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <ma...@gmail.com>
>> wrote:
>>
>>> Its not used as default serializer for some issues with compatibility &
>>> requirement to register the classes..
>>>
>>> Which part are you getting as nonserializable... you need to serialize
>>> that class if you are sending it to spark workers inside a map, reduce ,
>>> mappartition or any of the operations on RDD.
>>>
>>>
>>> Mayur Rustagi
>>> Ph: +1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>
>>>
>>>
>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> wrote:
>>>
>>>> I'm afraid persisting connection across two tasks is a dangerous act as
>>>> they
>>>> can't be guaranteed to be executed on the same machine. Your ES server
>>>> may
>>>> think its a man-in-the-middle attack!
>>>>
>>>> I think its possible to invoke a static method that give you a
>>>> connection in
>>>> a local 'pool', so nothing will sneak into your closure, but its too
>>>> complex
>>>> and there should be a better option.
>>>>
>>>> Never use kryo before, if its that good perhaps we should use it as the
>>>> default serializer
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>>
>
>


-- 
Cell : 425-233-8271

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es
connection, but in prodution I want to use ElasticClient.remote, to this I
want to pass ElasticClient to mapPartitions, or what is the best practices?
- my stream output is write into elasticsearch. How can I
test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
- After store the enriched data into ES, I want to generate aggregated data
(EsInputFormat) how can I test it in local?

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <ho...@pigscanfly.ca> wrote:

> So I'm giving a talk at the Spark summit on using Spark & ElasticSearch,
> but for now if you want to see a simple demo which uses elasticsearch for
> geo input you can take a look at my quick & dirty implementation with
> TopTweetsInALocation (
> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
> ). This approach uses the ESInputFormat which avoids the difficulty of
> having to manually create ElasticSearch clients.
>
> This approach might not work for your data, e.g. if you need to create a
> query for each record in your RDD. If this is the case, you could instead
> look at using mapPartitions and setting up your Elasticsearch connection
> inside of that, so you could then re-use the client for all of the queries
> on each partition. This approach will avoid having to serialize the
> Elasticsearch connection because it will be local to your function.
>
> Hope this helps!
>
> Cheers,
>
> Holden :)
>
>
> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <ma...@gmail.com>
> wrote:
>
>> Its not used as default serializer for some issues with compatibility &
>> requirement to register the classes..
>>
>> Which part are you getting as nonserializable... you need to serialize
>> that class if you are sending it to spark workers inside a map, reduce ,
>> mappartition or any of the operations on RDD.
>>
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>
>>
>>
>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> wrote:
>>
>>> I'm afraid persisting connection across two tasks is a dangerous act as
>>> they
>>> can't be guaranteed to be executed on the same machine. Your ES server
>>> may
>>> think its a man-in-the-middle attack!
>>>
>>> I think its possible to invoke a static method that give you a
>>> connection in
>>> a local 'pool', so nothing will sneak into your closure, but its too
>>> complex
>>> and there should be a better option.
>>>
>>> Never use kryo before, if its that good perhaps we should use it as the
>>> default serializer
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
>

Re: ElasticSearch enrich

Posted by Holden Karau <ho...@pigscanfly.ca>.
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch,
but for now if you want to see a simple demo which uses elasticsearch for
geo input you can take a look at my quick & dirty implementation with
TopTweetsInALocation (
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
). This approach uses the ESInputFormat which avoids the difficulty of
having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a
query for each record in your RDD. If this is the case, you could instead
look at using mapPartitions and setting up your Elasticsearch connection
inside of that, so you could then re-use the client for all of the queries
on each partition. This approach will avoid having to serialize the
Elasticsearch connection because it will be local to your function.

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <ma...@gmail.com>
wrote:

> Its not used as default serializer for some issues with compatibility &
> requirement to register the classes..
>
> Which part are you getting as nonserializable... you need to serialize
> that class if you are sending it to spark workers inside a map, reduce ,
> mappartition or any of the operations on RDD.
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> wrote:
>
>> I'm afraid persisting connection across two tasks is a dangerous act as
>> they
>> can't be guaranteed to be executed on the same machine. Your ES server may
>> think its a man-in-the-middle attack!
>>
>> I think its possible to invoke a static method that give you a connection
>> in
>> a local 'pool', so nothing will sneak into your closure, but its too
>> complex
>> and there should be a better option.
>>
>> Never use kryo before, if its that good perhaps we should use it as the
>> default serializer
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


-- 
Cell : 425-233-8271

Re: ElasticSearch enrich

Posted by Mayur Rustagi <ma...@gmail.com>.
Its not used as default serializer for some issues with compatibility &
requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that
class if you are sending it to spark workers inside a map, reduce ,
mappartition or any of the operations on RDD.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> wrote:

> I'm afraid persisting connection across two tasks is a dangerous act as
> they
> can't be guaranteed to be executed on the same machine. Your ES server may
> think its a man-in-the-middle attack!
>
> I think its possible to invoke a static method that give you a connection
> in
> a local 'pool', so nothing will sneak into your closure, but its too
> complex
> and there should be a better option.
>
> Never use kryo before, if its that good perhaps we should use it as the
> default serializer
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: ElasticSearch enrich

Posted by Peng Cheng <pc...@uow.edu.au>.
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
Ok but in this case where can I store the ES connection? Or all document
create new ES connection inside the worker?

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Wed, Jun 25, 2014 at 1:01 AM, Peng Cheng <pc...@uow.edu.au> wrote:

> make sure all queries are called through class methods and wrap your query
> info with a class having only simple properties (strings, collections etc).
> If you can't find such wrapper you can also use SerializableWritable
> wrapper
> out-of-the-box, but its not recommended. (developer-api and make fat
> closures that run slowly)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: ElasticSearch enrich

Posted by Peng Cheng <pc...@uow.edu.au>.
make sure all queries are called through class methods and wrap your query
info with a class having only simple properties (strings, collections etc).
If you can't find such wrapper you can also use SerializableWritable wrapper
out-of-the-box, but its not recommended. (developer-api and make fat
closures that run slowly)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: ElasticSearch enrich

Posted by boci <bo...@gmail.com>.
I using elastic4s inside my ESWorker class. ESWorker now only contain two
field, host:String, port:Int. Now Inside the "findNearestCity" method I
create ElasticClient (elastic4s) connection. What's wrong with my class? I
need to serialize ElasticClient? mappartition is sounds good but I still
got NotSerializableException, or I must mar kit to transient? and where
come the host and port in this case?

my worker:

class ESWorker(val host: String, val port: Int) {
  def findNearestCity(geo: Position): Option[City] = {
     //Here I create ElasticClient connection and execute queries
  }
  def enrichGeoInternal(data:Data):Data = {
     data.location=findNearestCity(data.position)
  }
  def enrichGeo(ds: DStream[Data]): DStream[Data] = {
     ds.map(enrichGeoInternal)
  }
}



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com


On Wed, Jun 25, 2014 at 1:03 AM, Mayur Rustagi <ma...@gmail.com>
wrote:

> Mostly ES client is not serializable for you. You can do 3 workarounds,
> 1. Switch to kryo serialization, register the client in kryo , might solve
> your serialization issue
> 2. Use mappartition for all your data & initialize your client in the
> mappartition code, this will create client for each partition, reduce some
> parallelism & add some overhead of creation of client but prevent
> serialization of esclient & transfer to workers
> 3. Use serializablewrapper to serialize your ESclient manually & send it
> across & deserialize it manually, this may or may not work depending on
> whether your class is safely serializable.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Wed, Jun 25, 2014 at 4:12 AM, boci <bo...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have a small question. I want to create a "Worker" class which using
>> ElasticClient to make query to elasticsearch. (I want to enrich my data
>> with geo search result).
>>
>> How can I do that? I try to create a worker instance with ES host/port
>> parameter but spark throw an exceptino (my class not serializable).
>>
>> Any idea?
>>
>> Thanks
>> b0c1
>>
>>
>

Re: ElasticSearch enrich

Posted by Mayur Rustagi <ma...@gmail.com>.
Mostly ES client is not serializable for you. You can do 3 workarounds,
1. Switch to kryo serialization, register the client in kryo , might solve
your serialization issue
2. Use mappartition for all your data & initialize your client in the
mappartition code, this will create client for each partition, reduce some
parallelism & add some overhead of creation of client but prevent
serialization of esclient & transfer to workers
3. Use serializablewrapper to serialize your ESclient manually & send it
across & deserialize it manually, this may or may not work depending on
whether your class is safely serializable.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Wed, Jun 25, 2014 at 4:12 AM, boci <bo...@gmail.com> wrote:

> Hi guys,
>
> I have a small question. I want to create a "Worker" class which using
> ElasticClient to make query to elasticsearch. (I want to enrich my data
> with geo search result).
>
> How can I do that? I try to create a worker instance with ES host/port
> parameter but spark throw an exceptino (my class not serializable).
>
> Any idea?
>
> Thanks
> b0c1
>
>