You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ajit Bhingarkar <aj...@capiot.com> on 2015/07/05 17:42:13 UTC

Spark custom streaming receiver not storing data reliably?

Hi,

I am trying to integrate Drools rules API with Spark so that the solution
could solve few CEP centric use cases.

When I read data from a local file (simple FileWriter -> readLine()), I see
that all my rules are reliably fired and everytime I get the results as
expected. I have tested with file record sizes from 5K to 5M; results are
as expected, every time.

However when I try to receive same data through a stream (I created a
simple ServerSocket, and am reading the file and writing to the socket line
by line) using a custom socket receiver; even though I see that data is
received at the custom receiver's end; perhaps store() API has an issue,
and data is not reliably persisted, (I am
using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).

Result is that my rules don't get fired reliably, and everytime I get a
different result. It also could be internal data loss within Spark engine.

I am using a a single Windows based server, and latest 1.4.0.

I have attached code for custom receiver, and my socket server which
streams file data as text.

Can someone pls shed more light on this issue? I have read in the
documentation that a reliable receiver needs to implement
*store(multi-records)*, but couldn't find any example.

Many thanks in advance for any inputs or suggestions for trying out.

Regards,
Ajit

Re: Spark custom streaming receiver not storing data reliably?

Posted by Ajit Bhingarkar <aj...@capiot.com>.
The inconsistency is resolved; I can see rules getting fired consistently
and reliably across a File based source, and a steam (of file data), and a
JMS stream. I am running more tests till 50M facts/events, but looks like
it is working now.

Regards,
Ajit

On Mon, Jul 6, 2015 at 11:59 AM, Ajit Bhingarkar <aj...@capiot.com> wrote:

> Jorn,
>
> Thanks for your response.
>
> I am pasting below a snippet of code which shows drools integration when
> facts/events are picked up after reading through a File
> (FileReader->readLine()), it works as expected and I have tested it for
> wide range of record data in a File.
>
> Same code doesn't work when I try to do same thing on a streaming incoming
> data generated out of same File. I have used several batch durations, from
> 1 to 50 seconds. Every execution shows that rules did not fire on some
> valid facts/events.
>
> I also thought of it being an issue with multi-threading; but that is not
> the case as well. I have verified.
>
> Hope this provides with with all the relevant information.
>
> Regards,
> Ajit
>
>
> /*
>  * Copyright (c) 2015. Capiot Software India Pvt Ltd.
>  * Author: ajit@capiot.com
>  */
>
> package com.capiot.analytics.spark.file;
>
> import com.capiot.analytics.spark.Person;
> import com.capiot.analytics.spark.util.KnowledgeBaseHelperUtil;
> import com.capiot.analytics.spark.util.TrackingAgendaEventListener;
> import org.apache.spark.api.java.function.VoidFunction;
> import org.drools.runtime.StatefulKnowledgeSession;
>
> import java.io.BufferedWriter;
> import java.io.PrintWriter;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.atomic.AtomicInteger;
>
> public class RuleExceutionFunction implements VoidFunction <Person>
> {
>     static StatefulKnowledgeSession knowledgeSession;
>     static List<Person> customersWithOffers =
> Collections.synchronizedList(new ArrayList());
>     //static Map<Integer, String> map = Collections.synchronizedMap(new
> HashMap());
>     static TrackingAgendaEventListener agendaEventListener = new
> TrackingAgendaEventListener();
>
>     static AtomicInteger count = new AtomicInteger(0);
>
>     //private static final File f = new
> File("C:\\Users\\bajit\\Documents\\customerOffers_5k_file.csv");
>     private static PrintWriter pw = null;
>     private static PrintWriter pwp = null;
>
>     private static final long serialVersionUID = 2370;
>
>     public RuleExceutionFunction()  throws Exception
>     {
>         if (knowledgeSession == null)
>         {
>             knowledgeSession =
> KnowledgeBaseHelperUtil.getStatefulKnowledgeSession("offers.drl");
>             knowledgeSession.addEventListener(agendaEventListener);
>
>             {
>                 pw = new PrintWriter(new BufferedWriter(new
> java.io.FileWriter
>
> ("C:\\Users\\bajit\\Documents\\customerOffers_file_5k.csv")
>                 ), true);
>
>                 pwp = new PrintWriter(new BufferedWriter(new
> java.io.FileWriter
>
>  ("C:\\Users\\bajit\\Documents\\processed_customers_file_5k" +
>                                                           ".csv")
>                 ), true);
>             }
>         }
>     }
>
>
>     @Override
>     public void call(Person person) throws Exception
>     {
>         //List<Person> facts = rdd.collect();
>         //Apply rules on facts here
>         //synchronized (this)
>         {
>             knowledgeSession.insert(person);
>             int i = knowledgeSession.fireAllRules();
>         }
>
>         //System.out.println("++++++ '"+
> agendaEventListener.activationsToString());
>
>         if (person.hasOffer())
>         {
>             customersWithOffers.add(person);
>             //Files.append(person.toString() +
> System.getProperty("line.separator"), f, Charset.defaultCharset());
>             pw.println(person.toString());
>         }
>
>         pwp.println(person.toString());
>
>         count.getAndIncrement();
>     }
>
>     public StatefulKnowledgeSession getSession()
>     {
>         return knowledgeSession;
>     }
>
>     public List<Person> getCustomersWithOffers()
>     {
>         return customersWithOffers;
>     }
> }
>
>
> On Mon, Jul 6, 2015 at 10:21 AM, Jörn Franke <jo...@gmail.com> wrote:
>
>> Can you provide the result set you are using and specify how you
>> integrated the drools engine?
>> Drools basically is based on a large shared memory. Hence, if you have
>> several tasks in Shark they end up using different shared memory areas.
>> A full integration of drools requires some sophisticated design and
>> probably rewriting of the rules evaluation algorithm, so you probably have
>> to rewrite that engine from scratch.
>>
>> Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <aj...@capiot.com> a écrit :
>>
>>> Hi,
>>>
>>> I am trying to integrate Drools rules API with Spark so that the
>>> solution could solve few CEP centric use cases.
>>>
>>> When I read data from a local file (simple FileWriter -> readLine()), I
>>> see that all my rules are reliably fired and everytime I get the results as
>>> expected. I have tested with file record sizes from 5K to 5M; results are
>>> as expected, every time.
>>>
>>> However when I try to receive same data through a stream (I created a
>>> simple ServerSocket, and am reading the file and writing to the socket line
>>> by line) using a custom socket receiver; even though I see that data is
>>> received at the custom receiver's end; perhaps store() API has an issue,
>>> and data is not reliably persisted, (I am
>>> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).
>>>
>>> Result is that my rules don't get fired reliably, and everytime I get a
>>> different result. It also could be internal data loss within Spark engine.
>>>
>>> I am using a a single Windows based server, and latest 1.4.0.
>>>
>>> I have attached code for custom receiver, and my socket server which
>>> streams file data as text.
>>>
>>> Can someone pls shed more light on this issue? I have read in the
>>> documentation that a reliable receiver needs to implement
>>> *store(multi-records)*, but couldn't find any example.
>>>
>>> Many thanks in advance for any inputs or suggestions for trying out.
>>>
>>> Regards,
>>> Ajit
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Spark custom streaming receiver not storing data reliably?

Posted by Ajit Bhingarkar <aj...@capiot.com>.
Jorn,

Thanks for your response.

I am pasting below a snippet of code which shows drools integration when
facts/events are picked up after reading through a File
(FileReader->readLine()), it works as expected and I have tested it for
wide range of record data in a File.

Same code doesn't work when I try to do same thing on a streaming incoming
data generated out of same File. I have used several batch durations, from
1 to 50 seconds. Every execution shows that rules did not fire on some
valid facts/events.

I also thought of it being an issue with multi-threading; but that is not
the case as well. I have verified.

Hope this provides with with all the relevant information.

Regards,
Ajit


/*
 * Copyright (c) 2015. Capiot Software India Pvt Ltd.
 * Author: ajit@capiot.com
 */

package com.capiot.analytics.spark.file;

import com.capiot.analytics.spark.Person;
import com.capiot.analytics.spark.util.KnowledgeBaseHelperUtil;
import com.capiot.analytics.spark.util.TrackingAgendaEventListener;
import org.apache.spark.api.java.function.VoidFunction;
import org.drools.runtime.StatefulKnowledgeSession;

import java.io.BufferedWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class RuleExceutionFunction implements VoidFunction <Person>
{
    static StatefulKnowledgeSession knowledgeSession;
    static List<Person> customersWithOffers =
Collections.synchronizedList(new ArrayList());
    //static Map<Integer, String> map = Collections.synchronizedMap(new
HashMap());
    static TrackingAgendaEventListener agendaEventListener = new
TrackingAgendaEventListener();

    static AtomicInteger count = new AtomicInteger(0);

    //private static final File f = new
File("C:\\Users\\bajit\\Documents\\customerOffers_5k_file.csv");
    private static PrintWriter pw = null;
    private static PrintWriter pwp = null;

    private static final long serialVersionUID = 2370;

    public RuleExceutionFunction()  throws Exception
    {
        if (knowledgeSession == null)
        {
            knowledgeSession =
KnowledgeBaseHelperUtil.getStatefulKnowledgeSession("offers.drl");
            knowledgeSession.addEventListener(agendaEventListener);

            {
                pw = new PrintWriter(new BufferedWriter(new
java.io.FileWriter

("C:\\Users\\bajit\\Documents\\customerOffers_file_5k.csv")
                ), true);

                pwp = new PrintWriter(new BufferedWriter(new
java.io.FileWriter

 ("C:\\Users\\bajit\\Documents\\processed_customers_file_5k" +
                                                          ".csv")
                ), true);
            }
        }
    }


    @Override
    public void call(Person person) throws Exception
    {
        //List<Person> facts = rdd.collect();
        //Apply rules on facts here
        //synchronized (this)
        {
            knowledgeSession.insert(person);
            int i = knowledgeSession.fireAllRules();
        }

        //System.out.println("++++++ '"+
agendaEventListener.activationsToString());

        if (person.hasOffer())
        {
            customersWithOffers.add(person);
            //Files.append(person.toString() +
System.getProperty("line.separator"), f, Charset.defaultCharset());
            pw.println(person.toString());
        }

        pwp.println(person.toString());

        count.getAndIncrement();
    }

    public StatefulKnowledgeSession getSession()
    {
        return knowledgeSession;
    }

    public List<Person> getCustomersWithOffers()
    {
        return customersWithOffers;
    }
}


On Mon, Jul 6, 2015 at 10:21 AM, Jörn Franke <jo...@gmail.com> wrote:

> Can you provide the result set you are using and specify how you
> integrated the drools engine?
> Drools basically is based on a large shared memory. Hence, if you have
> several tasks in Shark they end up using different shared memory areas.
> A full integration of drools requires some sophisticated design and
> probably rewriting of the rules evaluation algorithm, so you probably have
> to rewrite that engine from scratch.
>
> Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <aj...@capiot.com> a écrit :
>
>> Hi,
>>
>> I am trying to integrate Drools rules API with Spark so that the solution
>> could solve few CEP centric use cases.
>>
>> When I read data from a local file (simple FileWriter -> readLine()), I
>> see that all my rules are reliably fired and everytime I get the results as
>> expected. I have tested with file record sizes from 5K to 5M; results are
>> as expected, every time.
>>
>> However when I try to receive same data through a stream (I created a
>> simple ServerSocket, and am reading the file and writing to the socket line
>> by line) using a custom socket receiver; even though I see that data is
>> received at the custom receiver's end; perhaps store() API has an issue,
>> and data is not reliably persisted, (I am
>> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).
>>
>> Result is that my rules don't get fired reliably, and everytime I get a
>> different result. It also could be internal data loss within Spark engine.
>>
>> I am using a a single Windows based server, and latest 1.4.0.
>>
>> I have attached code for custom receiver, and my socket server which
>> streams file data as text.
>>
>> Can someone pls shed more light on this issue? I have read in the
>> documentation that a reliable receiver needs to implement
>> *store(multi-records)*, but couldn't find any example.
>>
>> Many thanks in advance for any inputs or suggestions for trying out.
>>
>> Regards,
>> Ajit
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Spark custom streaming receiver not storing data reliably?

Posted by Jörn Franke <jo...@gmail.com>.
Can you provide the result set you are using and specify how you integrated
the drools engine?
Drools basically is based on a large shared memory. Hence, if you have
several tasks in Shark they end up using different shared memory areas.
A full integration of drools requires some sophisticated design and
probably rewriting of the rules evaluation algorithm, so you probably have
to rewrite that engine from scratch.

Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <aj...@capiot.com> a écrit :

> Hi,
>
> I am trying to integrate Drools rules API with Spark so that the solution
> could solve few CEP centric use cases.
>
> When I read data from a local file (simple FileWriter -> readLine()), I
> see that all my rules are reliably fired and everytime I get the results as
> expected. I have tested with file record sizes from 5K to 5M; results are
> as expected, every time.
>
> However when I try to receive same data through a stream (I created a
> simple ServerSocket, and am reading the file and writing to the socket line
> by line) using a custom socket receiver; even though I see that data is
> received at the custom receiver's end; perhaps store() API has an issue,
> and data is not reliably persisted, (I am
> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).
>
> Result is that my rules don't get fired reliably, and everytime I get a
> different result. It also could be internal data loss within Spark engine.
>
> I am using a a single Windows based server, and latest 1.4.0.
>
> I have attached code for custom receiver, and my socket server which
> streams file data as text.
>
> Can someone pls shed more light on this issue? I have read in the
> documentation that a reliable receiver needs to implement
> *store(multi-records)*, but couldn't find any example.
>
> Many thanks in advance for any inputs or suggestions for trying out.
>
> Regards,
> Ajit
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org