You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by d34th4ck3r <ga...@gmail.com> on 2015/03/12 08:48:12 UTC

Using Neo4j with Apache Spark

I'm trying to use Neo4j with Apache Spark Streaming but I am finding
serializability as an issue.

Basically, I want Apache Spark to parse and bundle my data in real time.
After, the data has been bundled it should be stored in the database, Neo4j.
However, I am getting this error:

org.apache.spark.SparkException: Task not serializable
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
    at
org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
    at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
    at twoGrams.Main$4.call(Main.java:102)
    at twoGrams.Main$4.call(Main.java:1)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException:
org.neo4j.kernel.EmbeddedGraphDatabase
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 17 more
Here is my code:

output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>

output.foreachRDD(
                new
Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){

                    @Override
                    public Void call(
                            JavaPairRDD<String, ArrayList&lt;String>> arg0,
                            Time arg1) throws Exception {
                        // TODO Auto-generated method stub

                        arg0.foreach(
                                new
VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){

                                    @Override
                                    public void call(
                                            Tuple2<String,
ArrayList&lt;String>> arg0)
                                            throws Exception {
                                        // TODO Auto-generated method stub
                                        try( Transaction tx =
graphDB.beginTx()){
                                           
if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
                                                System.out.println("Alread
in Database:" + arg0._1);
                                            else{
                                               
Neo4jOperations.createHMac(graphDB, arg0._1);
                                            }
                                            tx.success();
                                        }
                                    }

                        });
                        return null;
                    }



                });
Neo4jOperations Class:

public class Neo4jOperations{

public static Node getHMacFromValue(GraphDatabaseService graphDB,String
value){
        try(ResourceIterator<Node>
HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
"value", value).iterator()){
            return HMacs.next();
        }
    }

    public static void createHMac(GraphDatabaseService graphDB,String
value){
        Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
        HMac.setProperty("value", value);
        HMac.setProperty("time", new
SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    }
}
I know that I have to Serialize the class Neo4jOperations, but I'm able to
figure out how. Or is there any other way to achieve this?

Also, how can I store output of Spark Streaming in a database ?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Using Neo4j with Apache Spark

Posted by Gautam Bajaj <ga...@gmail.com>.
I have been trying to do the same, but where exactly do you suggest
creating the static object? As creating it inside for each RDD will
ultimately result in same problem and not creating it inside will result in
serializability issue.

On Fri, Mar 13, 2015 at 11:47 AM, Tathagata Das <td...@databricks.com> wrote:

> Well, that's why I had also suggested using a pool of the GraphDBService
> objects :)
> Also present in the programming guide link I had given.
>
> TD
>
>
> On Thu, Mar 12, 2015 at 7:38 PM, Gautam Bajaj <ga...@gmail.com>
> wrote:
>
>> Thanks a ton! That worked.
>>
>> However, this may have performance issue. As for each partition, I'd need
>> to restart the server, that was the basic reason I was creating graphDb
>> object outside this loop.
>>
>> On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> (Putting user@spark back in the to list)
>>>
>>> In the gist, you are creating graphDB object way outside the
>>> RDD.foreachPartition. I said last time, create the graphDB object inside
>>> the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
>>> and then using it from inside the rdd.foreachPartition. That is bringing
>>> the graphDB object in the task closure, and hence the system is trying to
>>> serialize the graphDB object when its serializing the closure. If you
>>> create the graphDB object inside the RDD.foreachPartition, then the closure
>>> will not refer to any prior graphDB object and therefore not serialize
>>> anything.
>>>
>>> On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj <ga...@gmail.com>
>>> wrote:
>>>
>>>> Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab
>>>>
>>>> I'll add the flag and send you stack trace, I have meetings now.
>>>>
>>>> On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das <td...@databricks.com>
>>>> wrote:
>>>>
>>>>> Could you show us that version of the code?
>>>>>
>>>>> Also helps to turn on java flag of extended debug info. That will show
>>>>> the lineage of objects leading to the nonserilaizable one.
>>>>> On Mar 12, 2015 1:32 AM, "Gautam Bajaj" <ga...@gmail.com> wrote:
>>>>>
>>>>>> I tried that too. It result in same serializability issue.
>>>>>>
>>>>>> GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
>>>>>> http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html
>>>>>>
>>>>>> On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <td...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> What is GraphDatabaseService object that you are using? Instead of
>>>>>>> creating them on the driver (outside foreachRDD), can you create them
>>>>>>> inside the RDD.foreach?
>>>>>>>
>>>>>>> In general, the right pattern for doing this in the programming guide
>>>>>>>
>>>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>>>>>
>>>>>>> So you should be doing (sorry for writing in scala)
>>>>>>>
>>>>>>> dstream.foreachRDD ((rdd: RDD, time: Time) => {
>>>>>>>     rdd.foreachPartition(iterator =>
>>>>>>>         // Create GraphDatabaseService object, or fetch it from a
>>>>>>> pool of GraphDatabaseService objects
>>>>>>>         // Use it to send the whole partition to Neo4j
>>>>>>>         // Destroy the object or release it to the pool
>>>>>>> })
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Neo4j is running externally. It has nothing to do with Spark
>>>>>>>> processes.
>>>>>>>>
>>>>>>>> Basically, the problem is, I'm unable to figure out a way to store
>>>>>>>> output of Spark on the database. As Spark Streaming requires Neo4j Core
>>>>>>>> Java API to be serializable as well.
>>>>>>>>
>>>>>>>> The answer points out to using REST API but their performance is
>>>>>>>> really poor when compared to Core Java API :
>>>>>>>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>>>>>>>>
>>>>>>>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <tdas@databricks.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Well the answers you got there are correct as well.
>>>>>>>>> Unfortunately I am not familiar with Neo4j enough to comment any
>>>>>>>>> more. Is the Neo4j graph database running externally (outside Spark
>>>>>>>>> cluster), or within the driver process, or on all the executors? Can you
>>>>>>>>> clarify that?
>>>>>>>>>
>>>>>>>>> TD
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <
>>>>>>>>> gautam1237@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Alright, I have also asked this question in StackOverflow:
>>>>>>>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>>>>>>>>
>>>>>>>>>> The code there is pretty neat.
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <
>>>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I am not sure if you realized but the code snipper it pretty
>>>>>>>>>>> mangled up in the email we received. It might be a good idea to put the
>>>>>>>>>>> code in pastebin or gist, much much easier for everyone to read.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <
>>>>>>>>>>> gautam1237@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am
>>>>>>>>>>>> finding
>>>>>>>>>>>> serializability as an issue.
>>>>>>>>>>>>
>>>>>>>>>>>> Basically, I want Apache Spark to parse and bundle my data in
>>>>>>>>>>>> real time.
>>>>>>>>>>>> After, the data has been bundled it should be stored in the
>>>>>>>>>>>> database, Neo4j.
>>>>>>>>>>>> However, I am getting this error:
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>>>>>>>>     at
>>>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>>>>>>>>     at
>>>>>>>>>>>> org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>>>>>>>>     at
>>>>>>>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>>>>>>>>     at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>>>>>>>>     at
>>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>>>     at
>>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>>>     at
>>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>>>     at
>>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>>>     at
>>>>>>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>>>>>>>>     at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>>>>>>>>     ... 17 more
>>>>>>>>>>>> Here is my code:
>>>>>>>>>>>>
>>>>>>>>>>>> output a stream of type: JavaPairDStream<String,
>>>>>>>>>>>> ArrayList&lt;String>>
>>>>>>>>>>>>
>>>>>>>>>>>> output.foreachRDD(
>>>>>>>>>>>>                 new
>>>>>>>>>>>>
>>>>>>>>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>>>>>>>>
>>>>>>>>>>>>                     @Override
>>>>>>>>>>>>                     public Void call(
>>>>>>>>>>>>                             JavaPairRDD<String,
>>>>>>>>>>>> ArrayList&lt;String>> arg0,
>>>>>>>>>>>>                             Time arg1) throws Exception {
>>>>>>>>>>>>                         // TODO Auto-generated method stub
>>>>>>>>>>>>
>>>>>>>>>>>>                         arg0.foreach(
>>>>>>>>>>>>                                 new
>>>>>>>>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>>>>>>>>
>>>>>>>>>>>>                                     @Override
>>>>>>>>>>>>                                     public void call(
>>>>>>>>>>>>                                             Tuple2<String,
>>>>>>>>>>>> ArrayList&lt;String>> arg0)
>>>>>>>>>>>>                                             throws Exception {
>>>>>>>>>>>>                                         // TODO Auto-generated
>>>>>>>>>>>> method stub
>>>>>>>>>>>>                                         try( Transaction tx =
>>>>>>>>>>>> graphDB.beginTx()){
>>>>>>>>>>>>
>>>>>>>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>>>>>>>>>
>>>>>>>>>>>> System.out.println("Alread
>>>>>>>>>>>> in Database:" + arg0._1);
>>>>>>>>>>>>                                             else{
>>>>>>>>>>>>
>>>>>>>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>>>>>>>>                                             }
>>>>>>>>>>>>                                             tx.success();
>>>>>>>>>>>>                                         }
>>>>>>>>>>>>                                     }
>>>>>>>>>>>>
>>>>>>>>>>>>                         });
>>>>>>>>>>>>                         return null;
>>>>>>>>>>>>                     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>                 });
>>>>>>>>>>>> Neo4jOperations Class:
>>>>>>>>>>>>
>>>>>>>>>>>> public class Neo4jOperations{
>>>>>>>>>>>>
>>>>>>>>>>>> public static Node getHMacFromValue(GraphDatabaseService
>>>>>>>>>>>> graphDB,String
>>>>>>>>>>>> value){
>>>>>>>>>>>>         try(ResourceIterator<Node>
>>>>>>>>>>>>
>>>>>>>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>>>>>>>>> "value", value).iterator()){
>>>>>>>>>>>>             return HMacs.next();
>>>>>>>>>>>>         }
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     public static void createHMac(GraphDatabaseService
>>>>>>>>>>>> graphDB,String
>>>>>>>>>>>> value){
>>>>>>>>>>>>         Node
>>>>>>>>>>>> HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>>>>>>>>         HMac.setProperty("value", value);
>>>>>>>>>>>>         HMac.setProperty("time", new
>>>>>>>>>>>>
>>>>>>>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>>>>>>>>     }
>>>>>>>>>>>> }
>>>>>>>>>>>> I know that I have to Serialize the class Neo4jOperations, but
>>>>>>>>>>>> I'm able to
>>>>>>>>>>>> figure out how. Or is there any other way to achieve this?
>>>>>>>>>>>>
>>>>>>>>>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Gautam
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Gautam
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Gautam
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Gautam
>>>>
>>>
>>>
>>
>>
>> --
>> Gautam
>>
>
>


-- 
Gautam

Re: Using Neo4j with Apache Spark

Posted by Tathagata Das <td...@databricks.com>.
Well, that's why I had also suggested using a pool of the GraphDBService
objects :)
Also present in the programming guide link I had given.

TD


On Thu, Mar 12, 2015 at 7:38 PM, Gautam Bajaj <ga...@gmail.com> wrote:

> Thanks a ton! That worked.
>
> However, this may have performance issue. As for each partition, I'd need
> to restart the server, that was the basic reason I was creating graphDb
> object outside this loop.
>
> On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> (Putting user@spark back in the to list)
>>
>> In the gist, you are creating graphDB object way outside the
>> RDD.foreachPartition. I said last time, create the graphDB object inside
>> the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
>> and then using it from inside the rdd.foreachPartition. That is bringing
>> the graphDB object in the task closure, and hence the system is trying to
>> serialize the graphDB object when its serializing the closure. If you
>> create the graphDB object inside the RDD.foreachPartition, then the closure
>> will not refer to any prior graphDB object and therefore not serialize
>> anything.
>>
>> On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj <ga...@gmail.com>
>> wrote:
>>
>>> Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab
>>>
>>> I'll add the flag and send you stack trace, I have meetings now.
>>>
>>> On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> Could you show us that version of the code?
>>>>
>>>> Also helps to turn on java flag of extended debug info. That will show
>>>> the lineage of objects leading to the nonserilaizable one.
>>>> On Mar 12, 2015 1:32 AM, "Gautam Bajaj" <ga...@gmail.com> wrote:
>>>>
>>>>> I tried that too. It result in same serializability issue.
>>>>>
>>>>> GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
>>>>> http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html
>>>>>
>>>>> On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <td...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> What is GraphDatabaseService object that you are using? Instead of
>>>>>> creating them on the driver (outside foreachRDD), can you create them
>>>>>> inside the RDD.foreach?
>>>>>>
>>>>>> In general, the right pattern for doing this in the programming guide
>>>>>>
>>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>>>>
>>>>>> So you should be doing (sorry for writing in scala)
>>>>>>
>>>>>> dstream.foreachRDD ((rdd: RDD, time: Time) => {
>>>>>>     rdd.foreachPartition(iterator =>
>>>>>>         // Create GraphDatabaseService object, or fetch it from a
>>>>>> pool of GraphDatabaseService objects
>>>>>>         // Use it to send the whole partition to Neo4j
>>>>>>         // Destroy the object or release it to the pool
>>>>>> })
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Neo4j is running externally. It has nothing to do with Spark
>>>>>>> processes.
>>>>>>>
>>>>>>> Basically, the problem is, I'm unable to figure out a way to store
>>>>>>> output of Spark on the database. As Spark Streaming requires Neo4j Core
>>>>>>> Java API to be serializable as well.
>>>>>>>
>>>>>>> The answer points out to using REST API but their performance is
>>>>>>> really poor when compared to Core Java API :
>>>>>>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>>>>>>>
>>>>>>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <td...@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Well the answers you got there are correct as well.
>>>>>>>> Unfortunately I am not familiar with Neo4j enough to comment any
>>>>>>>> more. Is the Neo4j graph database running externally (outside Spark
>>>>>>>> cluster), or within the driver process, or on all the executors? Can you
>>>>>>>> clarify that?
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <
>>>>>>>> gautam1237@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Alright, I have also asked this question in StackOverflow:
>>>>>>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>>>>>>>
>>>>>>>>> The code there is pretty neat.
>>>>>>>>>
>>>>>>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <
>>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>>
>>>>>>>>>> I am not sure if you realized but the code snipper it pretty
>>>>>>>>>> mangled up in the email we received. It might be a good idea to put the
>>>>>>>>>> code in pastebin or gist, much much easier for everyone to read.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <
>>>>>>>>>> gautam1237@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am
>>>>>>>>>>> finding
>>>>>>>>>>> serializability as an issue.
>>>>>>>>>>>
>>>>>>>>>>> Basically, I want Apache Spark to parse and bundle my data in
>>>>>>>>>>> real time.
>>>>>>>>>>> After, the data has been bundled it should be stored in the
>>>>>>>>>>> database, Neo4j.
>>>>>>>>>>> However, I am getting this error:
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>>>>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>>>>>>>     ... 17 more
>>>>>>>>>>> Here is my code:
>>>>>>>>>>>
>>>>>>>>>>> output a stream of type: JavaPairDStream<String,
>>>>>>>>>>> ArrayList&lt;String>>
>>>>>>>>>>>
>>>>>>>>>>> output.foreachRDD(
>>>>>>>>>>>                 new
>>>>>>>>>>>
>>>>>>>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>>>>>>>
>>>>>>>>>>>                     @Override
>>>>>>>>>>>                     public Void call(
>>>>>>>>>>>                             JavaPairRDD<String,
>>>>>>>>>>> ArrayList&lt;String>> arg0,
>>>>>>>>>>>                             Time arg1) throws Exception {
>>>>>>>>>>>                         // TODO Auto-generated method stub
>>>>>>>>>>>
>>>>>>>>>>>                         arg0.foreach(
>>>>>>>>>>>                                 new
>>>>>>>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>>>>>>>
>>>>>>>>>>>                                     @Override
>>>>>>>>>>>                                     public void call(
>>>>>>>>>>>                                             Tuple2<String,
>>>>>>>>>>> ArrayList&lt;String>> arg0)
>>>>>>>>>>>                                             throws Exception {
>>>>>>>>>>>                                         // TODO Auto-generated
>>>>>>>>>>> method stub
>>>>>>>>>>>                                         try( Transaction tx =
>>>>>>>>>>> graphDB.beginTx()){
>>>>>>>>>>>
>>>>>>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>>>>>>>>
>>>>>>>>>>> System.out.println("Alread
>>>>>>>>>>> in Database:" + arg0._1);
>>>>>>>>>>>                                             else{
>>>>>>>>>>>
>>>>>>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>>>>>>>                                             }
>>>>>>>>>>>                                             tx.success();
>>>>>>>>>>>                                         }
>>>>>>>>>>>                                     }
>>>>>>>>>>>
>>>>>>>>>>>                         });
>>>>>>>>>>>                         return null;
>>>>>>>>>>>                     }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>                 });
>>>>>>>>>>> Neo4jOperations Class:
>>>>>>>>>>>
>>>>>>>>>>> public class Neo4jOperations{
>>>>>>>>>>>
>>>>>>>>>>> public static Node getHMacFromValue(GraphDatabaseService
>>>>>>>>>>> graphDB,String
>>>>>>>>>>> value){
>>>>>>>>>>>         try(ResourceIterator<Node>
>>>>>>>>>>>
>>>>>>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>>>>>>>> "value", value).iterator()){
>>>>>>>>>>>             return HMacs.next();
>>>>>>>>>>>         }
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     public static void createHMac(GraphDatabaseService
>>>>>>>>>>> graphDB,String
>>>>>>>>>>> value){
>>>>>>>>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>>>>>>>         HMac.setProperty("value", value);
>>>>>>>>>>>         HMac.setProperty("time", new
>>>>>>>>>>>
>>>>>>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>>>>>>>     }
>>>>>>>>>>> }
>>>>>>>>>>> I know that I have to Serialize the class Neo4jOperations, but
>>>>>>>>>>> I'm able to
>>>>>>>>>>> figure out how. Or is there any other way to achieve this?
>>>>>>>>>>>
>>>>>>>>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Gautam
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Gautam
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Gautam
>>>>>
>>>>
>>>
>>>
>>> --
>>> Gautam
>>>
>>
>>
>
>
> --
> Gautam
>

Re: Using Neo4j with Apache Spark

Posted by Gautam Bajaj <ga...@gmail.com>.
Thanks a ton! That worked.

However, this may have performance issue. As for each partition, I'd need
to restart the server, that was the basic reason I was creating graphDb
object outside this loop.

On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das <td...@databricks.com> wrote:

> (Putting user@spark back in the to list)
>
> In the gist, you are creating graphDB object way outside the
> RDD.foreachPartition. I said last time, create the graphDB object inside
> the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
> and then using it from inside the rdd.foreachPartition. That is bringing
> the graphDB object in the task closure, and hence the system is trying to
> serialize the graphDB object when its serializing the closure. If you
> create the graphDB object inside the RDD.foreachPartition, then the closure
> will not refer to any prior graphDB object and therefore not serialize
> anything.
>
> On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj <ga...@gmail.com>
> wrote:
>
>> Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab
>>
>> I'll add the flag and send you stack trace, I have meetings now.
>>
>> On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> Could you show us that version of the code?
>>>
>>> Also helps to turn on java flag of extended debug info. That will show
>>> the lineage of objects leading to the nonserilaizable one.
>>> On Mar 12, 2015 1:32 AM, "Gautam Bajaj" <ga...@gmail.com> wrote:
>>>
>>>> I tried that too. It result in same serializability issue.
>>>>
>>>> GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
>>>> http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html
>>>>
>>>> On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <td...@databricks.com>
>>>> wrote:
>>>>
>>>>> What is GraphDatabaseService object that you are using? Instead of
>>>>> creating them on the driver (outside foreachRDD), can you create them
>>>>> inside the RDD.foreach?
>>>>>
>>>>> In general, the right pattern for doing this in the programming guide
>>>>>
>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>>>
>>>>> So you should be doing (sorry for writing in scala)
>>>>>
>>>>> dstream.foreachRDD ((rdd: RDD, time: Time) => {
>>>>>     rdd.foreachPartition(iterator =>
>>>>>         // Create GraphDatabaseService object, or fetch it from a
>>>>> pool of GraphDatabaseService objects
>>>>>         // Use it to send the whole partition to Neo4j
>>>>>         // Destroy the object or release it to the pool
>>>>> })
>>>>>
>>>>>
>>>>> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Neo4j is running externally. It has nothing to do with Spark
>>>>>> processes.
>>>>>>
>>>>>> Basically, the problem is, I'm unable to figure out a way to store
>>>>>> output of Spark on the database. As Spark Streaming requires Neo4j Core
>>>>>> Java API to be serializable as well.
>>>>>>
>>>>>> The answer points out to using REST API but their performance is
>>>>>> really poor when compared to Core Java API :
>>>>>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>>>>>>
>>>>>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <td...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Well the answers you got there are correct as well.
>>>>>>> Unfortunately I am not familiar with Neo4j enough to comment any
>>>>>>> more. Is the Neo4j graph database running externally (outside Spark
>>>>>>> cluster), or within the driver process, or on all the executors? Can you
>>>>>>> clarify that?
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <gautam1237@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Alright, I have also asked this question in StackOverflow:
>>>>>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>>>>>>
>>>>>>>> The code there is pretty neat.
>>>>>>>>
>>>>>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <tdas@databricks.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> I am not sure if you realized but the code snipper it pretty
>>>>>>>>> mangled up in the email we received. It might be a good idea to put the
>>>>>>>>> code in pastebin or gist, much much easier for everyone to read.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1237@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am
>>>>>>>>>> finding
>>>>>>>>>> serializability as an issue.
>>>>>>>>>>
>>>>>>>>>> Basically, I want Apache Spark to parse and bundle my data in
>>>>>>>>>> real time.
>>>>>>>>>> After, the data has been bundled it should be stored in the
>>>>>>>>>> database, Neo4j.
>>>>>>>>>> However, I am getting this error:
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>>>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>>>>>>     ... 17 more
>>>>>>>>>> Here is my code:
>>>>>>>>>>
>>>>>>>>>> output a stream of type: JavaPairDStream<String,
>>>>>>>>>> ArrayList&lt;String>>
>>>>>>>>>>
>>>>>>>>>> output.foreachRDD(
>>>>>>>>>>                 new
>>>>>>>>>>
>>>>>>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>>>>>>
>>>>>>>>>>                     @Override
>>>>>>>>>>                     public Void call(
>>>>>>>>>>                             JavaPairRDD<String,
>>>>>>>>>> ArrayList&lt;String>> arg0,
>>>>>>>>>>                             Time arg1) throws Exception {
>>>>>>>>>>                         // TODO Auto-generated method stub
>>>>>>>>>>
>>>>>>>>>>                         arg0.foreach(
>>>>>>>>>>                                 new
>>>>>>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>>>>>>
>>>>>>>>>>                                     @Override
>>>>>>>>>>                                     public void call(
>>>>>>>>>>                                             Tuple2<String,
>>>>>>>>>> ArrayList&lt;String>> arg0)
>>>>>>>>>>                                             throws Exception {
>>>>>>>>>>                                         // TODO Auto-generated
>>>>>>>>>> method stub
>>>>>>>>>>                                         try( Transaction tx =
>>>>>>>>>> graphDB.beginTx()){
>>>>>>>>>>
>>>>>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>>>>>>>
>>>>>>>>>> System.out.println("Alread
>>>>>>>>>> in Database:" + arg0._1);
>>>>>>>>>>                                             else{
>>>>>>>>>>
>>>>>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>>>>>>                                             }
>>>>>>>>>>                                             tx.success();
>>>>>>>>>>                                         }
>>>>>>>>>>                                     }
>>>>>>>>>>
>>>>>>>>>>                         });
>>>>>>>>>>                         return null;
>>>>>>>>>>                     }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>                 });
>>>>>>>>>> Neo4jOperations Class:
>>>>>>>>>>
>>>>>>>>>> public class Neo4jOperations{
>>>>>>>>>>
>>>>>>>>>> public static Node getHMacFromValue(GraphDatabaseService
>>>>>>>>>> graphDB,String
>>>>>>>>>> value){
>>>>>>>>>>         try(ResourceIterator<Node>
>>>>>>>>>>
>>>>>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>>>>>>> "value", value).iterator()){
>>>>>>>>>>             return HMacs.next();
>>>>>>>>>>         }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     public static void createHMac(GraphDatabaseService
>>>>>>>>>> graphDB,String
>>>>>>>>>> value){
>>>>>>>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>>>>>>         HMac.setProperty("value", value);
>>>>>>>>>>         HMac.setProperty("time", new
>>>>>>>>>>
>>>>>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>>>>>>     }
>>>>>>>>>> }
>>>>>>>>>> I know that I have to Serialize the class Neo4jOperations, but
>>>>>>>>>> I'm able to
>>>>>>>>>> figure out how. Or is there any other way to achieve this?
>>>>>>>>>>
>>>>>>>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> View this message in context:
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>> Nabble.com.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Gautam
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Gautam
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Gautam
>>>>
>>>
>>
>>
>> --
>> Gautam
>>
>
>


-- 
Gautam

Re: Using Neo4j with Apache Spark

Posted by Tathagata Das <td...@databricks.com>.
(Putting user@spark back in the to list)

In the gist, you are creating graphDB object way outside the
RDD.foreachPartition. I said last time, create the graphDB object inside
the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
and then using it from inside the rdd.foreachPartition. That is bringing
the graphDB object in the task closure, and hence the system is trying to
serialize the graphDB object when its serializing the closure. If you
create the graphDB object inside the RDD.foreachPartition, then the closure
will not refer to any prior graphDB object and therefore not serialize
anything.

On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj <ga...@gmail.com> wrote:

> Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab
>
> I'll add the flag and send you stack trace, I have meetings now.
>
> On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> Could you show us that version of the code?
>>
>> Also helps to turn on java flag of extended debug info. That will show
>> the lineage of objects leading to the nonserilaizable one.
>> On Mar 12, 2015 1:32 AM, "Gautam Bajaj" <ga...@gmail.com> wrote:
>>
>>> I tried that too. It result in same serializability issue.
>>>
>>> GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
>>> http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html
>>>
>>> On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> What is GraphDatabaseService object that you are using? Instead of
>>>> creating them on the driver (outside foreachRDD), can you create them
>>>> inside the RDD.foreach?
>>>>
>>>> In general, the right pattern for doing this in the programming guide
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>>
>>>> So you should be doing (sorry for writing in scala)
>>>>
>>>> dstream.foreachRDD ((rdd: RDD, time: Time) => {
>>>>     rdd.foreachPartition(iterator =>
>>>>         // Create GraphDatabaseService object, or fetch it from a pool
>>>> of GraphDatabaseService objects
>>>>         // Use it to send the whole partition to Neo4j
>>>>         // Destroy the object or release it to the pool
>>>> })
>>>>
>>>>
>>>> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <ga...@gmail.com>
>>>> wrote:
>>>>
>>>>> Neo4j is running externally. It has nothing to do with Spark processes.
>>>>>
>>>>> Basically, the problem is, I'm unable to figure out a way to store
>>>>> output of Spark on the database. As Spark Streaming requires Neo4j Core
>>>>> Java API to be serializable as well.
>>>>>
>>>>> The answer points out to using REST API but their performance is
>>>>> really poor when compared to Core Java API :
>>>>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>>>>>
>>>>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <td...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Well the answers you got there are correct as well.
>>>>>> Unfortunately I am not familiar with Neo4j enough to comment any
>>>>>> more. Is the Neo4j graph database running externally (outside Spark
>>>>>> cluster), or within the driver process, or on all the executors? Can you
>>>>>> clarify that?
>>>>>>
>>>>>> TD
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Alright, I have also asked this question in StackOverflow:
>>>>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>>>>>
>>>>>>> The code there is pretty neat.
>>>>>>>
>>>>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <td...@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I am not sure if you realized but the code snipper it pretty
>>>>>>>> mangled up in the email we received. It might be a good idea to put the
>>>>>>>> code in pastebin or gist, much much easier for everyone to read.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am
>>>>>>>>> finding
>>>>>>>>> serializability as an issue.
>>>>>>>>>
>>>>>>>>> Basically, I want Apache Spark to parse and bundle my data in real
>>>>>>>>> time.
>>>>>>>>> After, the data has been bundled it should be stored in the
>>>>>>>>> database, Neo4j.
>>>>>>>>> However, I am getting this error:
>>>>>>>>>
>>>>>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>>>>>     at
>>>>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>>>>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>>>>>     at
>>>>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>>>>>     at
>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>     at
>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>     at
>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>     at
>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>     at
>>>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>>>>>     at
>>>>>>>>>
>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>>>>>     ... 17 more
>>>>>>>>> Here is my code:
>>>>>>>>>
>>>>>>>>> output a stream of type: JavaPairDStream<String,
>>>>>>>>> ArrayList&lt;String>>
>>>>>>>>>
>>>>>>>>> output.foreachRDD(
>>>>>>>>>                 new
>>>>>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>>>>>
>>>>>>>>>                     @Override
>>>>>>>>>                     public Void call(
>>>>>>>>>                             JavaPairRDD<String,
>>>>>>>>> ArrayList&lt;String>> arg0,
>>>>>>>>>                             Time arg1) throws Exception {
>>>>>>>>>                         // TODO Auto-generated method stub
>>>>>>>>>
>>>>>>>>>                         arg0.foreach(
>>>>>>>>>                                 new
>>>>>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>>>>>
>>>>>>>>>                                     @Override
>>>>>>>>>                                     public void call(
>>>>>>>>>                                             Tuple2<String,
>>>>>>>>> ArrayList&lt;String>> arg0)
>>>>>>>>>                                             throws Exception {
>>>>>>>>>                                         // TODO Auto-generated
>>>>>>>>> method stub
>>>>>>>>>                                         try( Transaction tx =
>>>>>>>>> graphDB.beginTx()){
>>>>>>>>>
>>>>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>>>>>>
>>>>>>>>> System.out.println("Alread
>>>>>>>>> in Database:" + arg0._1);
>>>>>>>>>                                             else{
>>>>>>>>>
>>>>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>>>>>                                             }
>>>>>>>>>                                             tx.success();
>>>>>>>>>                                         }
>>>>>>>>>                                     }
>>>>>>>>>
>>>>>>>>>                         });
>>>>>>>>>                         return null;
>>>>>>>>>                     }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>                 });
>>>>>>>>> Neo4jOperations Class:
>>>>>>>>>
>>>>>>>>> public class Neo4jOperations{
>>>>>>>>>
>>>>>>>>> public static Node getHMacFromValue(GraphDatabaseService
>>>>>>>>> graphDB,String
>>>>>>>>> value){
>>>>>>>>>         try(ResourceIterator<Node>
>>>>>>>>>
>>>>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>>>>>> "value", value).iterator()){
>>>>>>>>>             return HMacs.next();
>>>>>>>>>         }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     public static void createHMac(GraphDatabaseService
>>>>>>>>> graphDB,String
>>>>>>>>> value){
>>>>>>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>>>>>         HMac.setProperty("value", value);
>>>>>>>>>         HMac.setProperty("time", new
>>>>>>>>>
>>>>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>> I know that I have to Serialize the class Neo4jOperations, but I'm
>>>>>>>>> able to
>>>>>>>>> figure out how. Or is there any other way to achieve this?
>>>>>>>>>
>>>>>>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> View this message in context:
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>> Nabble.com.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Gautam
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Gautam
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Gautam
>>>
>>
>
>
> --
> Gautam
>

Re: Using Neo4j with Apache Spark

Posted by Gautam Bajaj <ga...@gmail.com>.
I tried that too. It result in same serializability issue.

GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html

On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <td...@databricks.com> wrote:

> What is GraphDatabaseService object that you are using? Instead of
> creating them on the driver (outside foreachRDD), can you create them
> inside the RDD.foreach?
>
> In general, the right pattern for doing this in the programming guide
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
> So you should be doing (sorry for writing in scala)
>
> dstream.foreachRDD ((rdd: RDD, time: Time) => {
>     rdd.foreachPartition(iterator =>
>         // Create GraphDatabaseService object, or fetch it from a pool of GraphDatabaseService
> objects
>         // Use it to send the whole partition to Neo4j
>         // Destroy the object or release it to the pool
> })
>
>
> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <ga...@gmail.com>
> wrote:
>
>> Neo4j is running externally. It has nothing to do with Spark processes.
>>
>> Basically, the problem is, I'm unable to figure out a way to store output
>> of Spark on the database. As Spark Streaming requires Neo4j Core Java API
>> to be serializable as well.
>>
>> The answer points out to using REST API but their performance is really
>> poor when compared to Core Java API :
>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>>
>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> Well the answers you got there are correct as well.
>>> Unfortunately I am not familiar with Neo4j enough to comment any more.
>>> Is the Neo4j graph database running externally (outside Spark cluster), or
>>> within the driver process, or on all the executors? Can you clarify that?
>>>
>>> TD
>>>
>>>
>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <ga...@gmail.com>
>>> wrote:
>>>
>>>> Alright, I have also asked this question in StackOverflow:
>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>>
>>>> The code there is pretty neat.
>>>>
>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <td...@databricks.com>
>>>> wrote:
>>>>
>>>>> I am not sure if you realized but the code snipper it pretty mangled
>>>>> up in the email we received. It might be a good idea to put the code in
>>>>> pastebin or gist, much much easier for everyone to read.
>>>>>
>>>>>
>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
>>>>>> serializability as an issue.
>>>>>>
>>>>>> Basically, I want Apache Spark to parse and bundle my data in real
>>>>>> time.
>>>>>> After, the data has been bundled it should be stored in the database,
>>>>>> Neo4j.
>>>>>> However, I am getting this error:
>>>>>>
>>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>>     at
>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>>     at
>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>>     at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>     at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.io.NotSerializableException:
>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>>     ... 17 more
>>>>>> Here is my code:
>>>>>>
>>>>>> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>>>>>>
>>>>>> output.foreachRDD(
>>>>>>                 new
>>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>>
>>>>>>                     @Override
>>>>>>                     public Void call(
>>>>>>                             JavaPairRDD<String, ArrayList&lt;String>>
>>>>>> arg0,
>>>>>>                             Time arg1) throws Exception {
>>>>>>                         // TODO Auto-generated method stub
>>>>>>
>>>>>>                         arg0.foreach(
>>>>>>                                 new
>>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>>
>>>>>>                                     @Override
>>>>>>                                     public void call(
>>>>>>                                             Tuple2<String,
>>>>>> ArrayList&lt;String>> arg0)
>>>>>>                                             throws Exception {
>>>>>>                                         // TODO Auto-generated method
>>>>>> stub
>>>>>>                                         try( Transaction tx =
>>>>>> graphDB.beginTx()){
>>>>>>
>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>>>
>>>>>> System.out.println("Alread
>>>>>> in Database:" + arg0._1);
>>>>>>                                             else{
>>>>>>
>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>>                                             }
>>>>>>                                             tx.success();
>>>>>>                                         }
>>>>>>                                     }
>>>>>>
>>>>>>                         });
>>>>>>                         return null;
>>>>>>                     }
>>>>>>
>>>>>>
>>>>>>
>>>>>>                 });
>>>>>> Neo4jOperations Class:
>>>>>>
>>>>>> public class Neo4jOperations{
>>>>>>
>>>>>> public static Node getHMacFromValue(GraphDatabaseService
>>>>>> graphDB,String
>>>>>> value){
>>>>>>         try(ResourceIterator<Node>
>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>>> "value", value).iterator()){
>>>>>>             return HMacs.next();
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     public static void createHMac(GraphDatabaseService graphDB,String
>>>>>> value){
>>>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>>         HMac.setProperty("value", value);
>>>>>>         HMac.setProperty("time", new
>>>>>>
>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>>     }
>>>>>> }
>>>>>> I know that I have to Serialize the class Neo4jOperations, but I'm
>>>>>> able to
>>>>>> figure out how. Or is there any other way to achieve this?
>>>>>>
>>>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Gautam
>>>>
>>>
>>>
>>
>>
>> --
>> Gautam
>>
>
>


-- 
Gautam

Re: Using Neo4j with Apache Spark

Posted by Tathagata Das <td...@databricks.com>.
What is GraphDatabaseService object that you are using? Instead of creating
them on the driver (outside foreachRDD), can you create them inside the
RDD.foreach?

In general, the right pattern for doing this in the programming guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

So you should be doing (sorry for writing in scala)

dstream.foreachRDD ((rdd: RDD, time: Time) => {
    rdd.foreachPartition(iterator =>
        // Create GraphDatabaseService object, or fetch it from a pool
of GraphDatabaseService
objects
        // Use it to send the whole partition to Neo4j
        // Destroy the object or release it to the pool
})


On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <ga...@gmail.com> wrote:

> Neo4j is running externally. It has nothing to do with Spark processes.
>
> Basically, the problem is, I'm unable to figure out a way to store output
> of Spark on the database. As Spark Streaming requires Neo4j Core Java API
> to be serializable as well.
>
> The answer points out to using REST API but their performance is really
> poor when compared to Core Java API :
> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>
> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> Well the answers you got there are correct as well.
>> Unfortunately I am not familiar with Neo4j enough to comment any more. Is
>> the Neo4j graph database running externally (outside Spark cluster), or
>> within the driver process, or on all the executors? Can you clarify that?
>>
>> TD
>>
>>
>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <ga...@gmail.com>
>> wrote:
>>
>>> Alright, I have also asked this question in StackOverflow:
>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>
>>> The code there is pretty neat.
>>>
>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> I am not sure if you realized but the code snipper it pretty mangled up
>>>> in the email we received. It might be a good idea to put the code in
>>>> pastebin or gist, much much easier for everyone to read.
>>>>
>>>>
>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <ga...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
>>>>> serializability as an issue.
>>>>>
>>>>> Basically, I want Apache Spark to parse and bundle my data in real
>>>>> time.
>>>>> After, the data has been bundled it should be stored in the database,
>>>>> Neo4j.
>>>>> However, I am getting this error:
>>>>>
>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>     at
>>>>>
>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>     at
>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>     at
>>>>>
>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>     at
>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>     at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>     at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.io.NotSerializableException:
>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>     at
>>>>>
>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>     at
>>>>>
>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>     at
>>>>>
>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>     ... 17 more
>>>>> Here is my code:
>>>>>
>>>>> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>>>>>
>>>>> output.foreachRDD(
>>>>>                 new
>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>
>>>>>                     @Override
>>>>>                     public Void call(
>>>>>                             JavaPairRDD<String, ArrayList&lt;String>>
>>>>> arg0,
>>>>>                             Time arg1) throws Exception {
>>>>>                         // TODO Auto-generated method stub
>>>>>
>>>>>                         arg0.foreach(
>>>>>                                 new
>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>
>>>>>                                     @Override
>>>>>                                     public void call(
>>>>>                                             Tuple2<String,
>>>>> ArrayList&lt;String>> arg0)
>>>>>                                             throws Exception {
>>>>>                                         // TODO Auto-generated method
>>>>> stub
>>>>>                                         try( Transaction tx =
>>>>> graphDB.beginTx()){
>>>>>
>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>>
>>>>> System.out.println("Alread
>>>>> in Database:" + arg0._1);
>>>>>                                             else{
>>>>>
>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>                                             }
>>>>>                                             tx.success();
>>>>>                                         }
>>>>>                                     }
>>>>>
>>>>>                         });
>>>>>                         return null;
>>>>>                     }
>>>>>
>>>>>
>>>>>
>>>>>                 });
>>>>> Neo4jOperations Class:
>>>>>
>>>>> public class Neo4jOperations{
>>>>>
>>>>> public static Node getHMacFromValue(GraphDatabaseService graphDB,String
>>>>> value){
>>>>>         try(ResourceIterator<Node>
>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>> "value", value).iterator()){
>>>>>             return HMacs.next();
>>>>>         }
>>>>>     }
>>>>>
>>>>>     public static void createHMac(GraphDatabaseService graphDB,String
>>>>> value){
>>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>         HMac.setProperty("value", value);
>>>>>         HMac.setProperty("time", new
>>>>>
>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>     }
>>>>> }
>>>>> I know that I have to Serialize the class Neo4jOperations, but I'm
>>>>> able to
>>>>> figure out how. Or is there any other way to achieve this?
>>>>>
>>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Gautam
>>>
>>
>>
>
>
> --
> Gautam
>

Re: Using Neo4j with Apache Spark

Posted by Gautam Bajaj <ga...@gmail.com>.
Neo4j is running externally. It has nothing to do with Spark processes.

Basically, the problem is, I'm unable to figure out a way to store output
of Spark on the database. As Spark Streaming requires Neo4j Core Java API
to be serializable as well.

The answer points out to using REST API but their performance is really
poor when compared to Core Java API :
http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/

On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <td...@databricks.com> wrote:

> Well the answers you got there are correct as well.
> Unfortunately I am not familiar with Neo4j enough to comment any more. Is
> the Neo4j graph database running externally (outside Spark cluster), or
> within the driver process, or on all the executors? Can you clarify that?
>
> TD
>
>
> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <ga...@gmail.com>
> wrote:
>
>> Alright, I have also asked this question in StackOverflow:
>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>
>> The code there is pretty neat.
>>
>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> I am not sure if you realized but the code snipper it pretty mangled up
>>> in the email we received. It might be a good idea to put the code in
>>> pastebin or gist, much much easier for everyone to read.
>>>
>>>
>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <ga...@gmail.com>
>>> wrote:
>>>
>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
>>>> serializability as an issue.
>>>>
>>>> Basically, I want Apache Spark to parse and bundle my data in real time.
>>>> After, the data has been bundled it should be stored in the database,
>>>> Neo4j.
>>>> However, I am getting this error:
>>>>
>>>> org.apache.spark.SparkException: Task not serializable
>>>>     at
>>>>
>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>     at
>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>     at
>>>>
>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>     at
>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>     at
>>>>
>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>     at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>     at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.io.NotSerializableException:
>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>     at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>     at
>>>>
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>     at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>     at
>>>>
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>     at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>     at
>>>>
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>     at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>     at
>>>>
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>     at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>     at
>>>>
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>     at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>     at
>>>>
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>     at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>     at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>     at
>>>>
>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>     at
>>>>
>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>     at
>>>>
>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>     ... 17 more
>>>> Here is my code:
>>>>
>>>> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>>>>
>>>> output.foreachRDD(
>>>>                 new
>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>
>>>>                     @Override
>>>>                     public Void call(
>>>>                             JavaPairRDD<String, ArrayList&lt;String>>
>>>> arg0,
>>>>                             Time arg1) throws Exception {
>>>>                         // TODO Auto-generated method stub
>>>>
>>>>                         arg0.foreach(
>>>>                                 new
>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>
>>>>                                     @Override
>>>>                                     public void call(
>>>>                                             Tuple2<String,
>>>> ArrayList&lt;String>> arg0)
>>>>                                             throws Exception {
>>>>                                         // TODO Auto-generated method
>>>> stub
>>>>                                         try( Transaction tx =
>>>> graphDB.beginTx()){
>>>>
>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>
>>>> System.out.println("Alread
>>>> in Database:" + arg0._1);
>>>>                                             else{
>>>>
>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>                                             }
>>>>                                             tx.success();
>>>>                                         }
>>>>                                     }
>>>>
>>>>                         });
>>>>                         return null;
>>>>                     }
>>>>
>>>>
>>>>
>>>>                 });
>>>> Neo4jOperations Class:
>>>>
>>>> public class Neo4jOperations{
>>>>
>>>> public static Node getHMacFromValue(GraphDatabaseService graphDB,String
>>>> value){
>>>>         try(ResourceIterator<Node>
>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>> "value", value).iterator()){
>>>>             return HMacs.next();
>>>>         }
>>>>     }
>>>>
>>>>     public static void createHMac(GraphDatabaseService graphDB,String
>>>> value){
>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>         HMac.setProperty("value", value);
>>>>         HMac.setProperty("time", new
>>>>
>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>     }
>>>> }
>>>> I know that I have to Serialize the class Neo4jOperations, but I'm able
>>>> to
>>>> figure out how. Or is there any other way to achieve this?
>>>>
>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>>
>> --
>> Gautam
>>
>
>


-- 
Gautam

Re: Using Neo4j with Apache Spark

Posted by Tathagata Das <td...@databricks.com>.
Well the answers you got there are correct as well.
Unfortunately I am not familiar with Neo4j enough to comment any more. Is
the Neo4j graph database running externally (outside Spark cluster), or
within the driver process, or on all the executors? Can you clarify that?

TD


On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <ga...@gmail.com> wrote:

> Alright, I have also asked this question in StackOverflow:
> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>
> The code there is pretty neat.
>
> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> I am not sure if you realized but the code snipper it pretty mangled up
>> in the email we received. It might be a good idea to put the code in
>> pastebin or gist, much much easier for everyone to read.
>>
>>
>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <ga...@gmail.com>
>> wrote:
>>
>>> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
>>> serializability as an issue.
>>>
>>> Basically, I want Apache Spark to parse and bundle my data in real time.
>>> After, the data has been bundled it should be stored in the database,
>>> Neo4j.
>>> However, I am getting this error:
>>>
>>> org.apache.spark.SparkException: Task not serializable
>>>     at
>>>
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>     at
>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>     at
>>>
>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>     at
>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>     at twoGrams.Main$4.call(Main.java:102)
>>>     at twoGrams.Main$4.call(Main.java:1)
>>>     at
>>>
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>     at
>>>
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>     at
>>>
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>     at
>>>
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>     at
>>>
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>     at scala.util.Try$.apply(Try.scala:161)
>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>     at
>>>
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>     at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.NotSerializableException:
>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>     at
>>>
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>     at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>     at
>>>
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>     at
>>>
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>     at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>     at
>>>
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>     at
>>>
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>     at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>     at
>>>
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>     at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>     at
>>>
>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>     at
>>>
>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>     at
>>>
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>     ... 17 more
>>> Here is my code:
>>>
>>> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>>>
>>> output.foreachRDD(
>>>                 new
>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>
>>>                     @Override
>>>                     public Void call(
>>>                             JavaPairRDD<String, ArrayList&lt;String>>
>>> arg0,
>>>                             Time arg1) throws Exception {
>>>                         // TODO Auto-generated method stub
>>>
>>>                         arg0.foreach(
>>>                                 new
>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>
>>>                                     @Override
>>>                                     public void call(
>>>                                             Tuple2<String,
>>> ArrayList&lt;String>> arg0)
>>>                                             throws Exception {
>>>                                         // TODO Auto-generated method
>>> stub
>>>                                         try( Transaction tx =
>>> graphDB.beginTx()){
>>>
>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>
>>> System.out.println("Alread
>>> in Database:" + arg0._1);
>>>                                             else{
>>>
>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>                                             }
>>>                                             tx.success();
>>>                                         }
>>>                                     }
>>>
>>>                         });
>>>                         return null;
>>>                     }
>>>
>>>
>>>
>>>                 });
>>> Neo4jOperations Class:
>>>
>>> public class Neo4jOperations{
>>>
>>> public static Node getHMacFromValue(GraphDatabaseService graphDB,String
>>> value){
>>>         try(ResourceIterator<Node>
>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>> "value", value).iterator()){
>>>             return HMacs.next();
>>>         }
>>>     }
>>>
>>>     public static void createHMac(GraphDatabaseService graphDB,String
>>> value){
>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>         HMac.setProperty("value", value);
>>>         HMac.setProperty("time", new
>>>
>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>     }
>>> }
>>> I know that I have to Serialize the class Neo4jOperations, but I'm able
>>> to
>>> figure out how. Or is there any other way to achieve this?
>>>
>>> Also, how can I store output of Spark Streaming in a database ?
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>
>
> --
> Gautam
>

Re: Using Neo4j with Apache Spark

Posted by Gautam Bajaj <ga...@gmail.com>.
Alright, I have also asked this question in StackOverflow:
http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

The code there is pretty neat.

On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <td...@databricks.com> wrote:

> I am not sure if you realized but the code snipper it pretty mangled up in
> the email we received. It might be a good idea to put the code in pastebin
> or gist, much much easier for everyone to read.
>
>
> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <ga...@gmail.com> wrote:
>
>> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
>> serializability as an issue.
>>
>> Basically, I want Apache Spark to parse and bundle my data in real time.
>> After, the data has been bundled it should be stored in the database,
>> Neo4j.
>> However, I am getting this error:
>>
>> org.apache.spark.SparkException: Task not serializable
>>     at
>>
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>     at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>     at
>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>     at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>     at twoGrams.Main$4.call(Main.java:102)
>>     at twoGrams.Main$4.call(Main.java:1)
>>     at
>>
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>     at
>>
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>     at
>>
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>     at
>>
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>     at
>>
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>     at scala.util.Try$.apply(Try.scala:161)
>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>     at
>>
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>     at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>     at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.NotSerializableException:
>> org.neo4j.kernel.EmbeddedGraphDatabase
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>     at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>     at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>     at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>     at
>>
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>     at
>>
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>     at
>>
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>     ... 17 more
>> Here is my code:
>>
>> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>>
>> output.foreachRDD(
>>                 new
>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>
>>                     @Override
>>                     public Void call(
>>                             JavaPairRDD<String, ArrayList&lt;String>>
>> arg0,
>>                             Time arg1) throws Exception {
>>                         // TODO Auto-generated method stub
>>
>>                         arg0.foreach(
>>                                 new
>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>
>>                                     @Override
>>                                     public void call(
>>                                             Tuple2<String,
>> ArrayList&lt;String>> arg0)
>>                                             throws Exception {
>>                                         // TODO Auto-generated method stub
>>                                         try( Transaction tx =
>> graphDB.beginTx()){
>>
>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>                                                 System.out.println("Alread
>> in Database:" + arg0._1);
>>                                             else{
>>
>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>                                             }
>>                                             tx.success();
>>                                         }
>>                                     }
>>
>>                         });
>>                         return null;
>>                     }
>>
>>
>>
>>                 });
>> Neo4jOperations Class:
>>
>> public class Neo4jOperations{
>>
>> public static Node getHMacFromValue(GraphDatabaseService graphDB,String
>> value){
>>         try(ResourceIterator<Node>
>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>> "value", value).iterator()){
>>             return HMacs.next();
>>         }
>>     }
>>
>>     public static void createHMac(GraphDatabaseService graphDB,String
>> value){
>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>         HMac.setProperty("value", value);
>>         HMac.setProperty("time", new
>>
>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>     }
>> }
>> I know that I have to Serialize the class Neo4jOperations, but I'm able to
>> figure out how. Or is there any other way to achieve this?
>>
>> Also, how can I store output of Spark Streaming in a database ?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>


-- 
Gautam

Re: Using Neo4j with Apache Spark

Posted by Tathagata Das <td...@databricks.com>.
I am not sure if you realized but the code snipper it pretty mangled up in
the email we received. It might be a good idea to put the code in pastebin
or gist, much much easier for everyone to read.


On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <ga...@gmail.com> wrote:

> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
> serializability as an issue.
>
> Basically, I want Apache Spark to parse and bundle my data in real time.
> After, the data has been bundled it should be stored in the database,
> Neo4j.
> However, I am getting this error:
>
> org.apache.spark.SparkException: Task not serializable
>     at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>     at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>     at
> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>     at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>     at twoGrams.Main$4.call(Main.java:102)
>     at twoGrams.Main$4.call(Main.java:1)
>     at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>     at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>     at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>     at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>     at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>     at scala.util.Try$.apply(Try.scala:161)
>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>     at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>     at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.NotSerializableException:
> org.neo4j.kernel.EmbeddedGraphDatabase
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>     at
>
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>     at
>
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>     at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>     ... 17 more
> Here is my code:
>
> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>
> output.foreachRDD(
>                 new
> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>
>                     @Override
>                     public Void call(
>                             JavaPairRDD<String, ArrayList&lt;String>> arg0,
>                             Time arg1) throws Exception {
>                         // TODO Auto-generated method stub
>
>                         arg0.foreach(
>                                 new
> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>
>                                     @Override
>                                     public void call(
>                                             Tuple2<String,
> ArrayList&lt;String>> arg0)
>                                             throws Exception {
>                                         // TODO Auto-generated method stub
>                                         try( Transaction tx =
> graphDB.beginTx()){
>
> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>                                                 System.out.println("Alread
> in Database:" + arg0._1);
>                                             else{
>
> Neo4jOperations.createHMac(graphDB, arg0._1);
>                                             }
>                                             tx.success();
>                                         }
>                                     }
>
>                         });
>                         return null;
>                     }
>
>
>
>                 });
> Neo4jOperations Class:
>
> public class Neo4jOperations{
>
> public static Node getHMacFromValue(GraphDatabaseService graphDB,String
> value){
>         try(ResourceIterator<Node>
> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
> "value", value).iterator()){
>             return HMacs.next();
>         }
>     }
>
>     public static void createHMac(GraphDatabaseService graphDB,String
> value){
>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>         HMac.setProperty("value", value);
>         HMac.setProperty("time", new
>
> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>     }
> }
> I know that I have to Serialize the class Neo4jOperations, but I'm able to
> figure out how. Or is there any other way to achieve this?
>
> Also, how can I store output of Spark Streaming in a database ?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Using Neo4j with Apache Spark

Posted by Marcin Cylke <ma...@ext.allegro.pl>.
On Thu, 12 Mar 2015 00:48:12 -0700
d34th4ck3r <ga...@gmail.com> wrote:

> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
> serializability as an issue.
> 
> Basically, I want Apache Spark to parse and bundle my data in real
> time. After, the data has been bundled it should be stored in the
> database, Neo4j. However, I am getting this error:

Hi

It seems some things in your task aren't serializable. A quick look at
the code suggests graphDB as a potential problem. 

If you want to create that in one place (driver) and fetch it later in
the step you can do sth like this:

- create a container class, that you will broadcast

class LazyGraphDB extends Serializable {
  @transient
  override lazy val graphDB = new GraphDatabase()
}

- than in driver code:

val graphDbBc = sc.broadcast(new LazyGraphDB)

- and in the task you'd like to use it, just write:

graphDbBc.value.graphDB...

Just remember about all the "transient, lazy" modifiers.

Regards 
Marcin

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org