You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ajit Bhingarkar <aj...@capiot.com> on 2015/07/05 17:42:13 UTC
Spark custom streaming receiver not storing data reliably?
Hi,
I am trying to integrate Drools rules API with Spark so that the solution
could solve few CEP centric use cases.
When I read data from a local file (simple FileWriter -> readLine()), I see
that all my rules are reliably fired and everytime I get the results as
expected. I have tested with file record sizes from 5K to 5M; results are
as expected, every time.
However when I try to receive same data through a stream (I created a
simple ServerSocket, and am reading the file and writing to the socket line
by line) using a custom socket receiver; even though I see that data is
received at the custom receiver's end; perhaps store() API has an issue,
and data is not reliably persisted, (I am
using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).
Result is that my rules don't get fired reliably, and everytime I get a
different result. It also could be internal data loss within Spark engine.
I am using a a single Windows based server, and latest 1.4.0.
I have attached code for custom receiver, and my socket server which
streams file data as text.
Can someone pls shed more light on this issue? I have read in the
documentation that a reliable receiver needs to implement
*store(multi-records)*, but couldn't find any example.
Many thanks in advance for any inputs or suggestions for trying out.
Regards,
Ajit
Re: Spark custom streaming receiver not storing data reliably?
Posted by Ajit Bhingarkar <aj...@capiot.com>.
The inconsistency is resolved; I can see rules getting fired consistently
and reliably across a File based source, and a steam (of file data), and a
JMS stream. I am running more tests till 50M facts/events, but looks like
it is working now.
Regards,
Ajit
On Mon, Jul 6, 2015 at 11:59 AM, Ajit Bhingarkar <aj...@capiot.com> wrote:
> Jorn,
>
> Thanks for your response.
>
> I am pasting below a snippet of code which shows drools integration when
> facts/events are picked up after reading through a File
> (FileReader->readLine()), it works as expected and I have tested it for
> wide range of record data in a File.
>
> Same code doesn't work when I try to do same thing on a streaming incoming
> data generated out of same File. I have used several batch durations, from
> 1 to 50 seconds. Every execution shows that rules did not fire on some
> valid facts/events.
>
> I also thought of it being an issue with multi-threading; but that is not
> the case as well. I have verified.
>
> Hope this provides with with all the relevant information.
>
> Regards,
> Ajit
>
>
> /*
> * Copyright (c) 2015. Capiot Software India Pvt Ltd.
> * Author: ajit@capiot.com
> */
>
> package com.capiot.analytics.spark.file;
>
> import com.capiot.analytics.spark.Person;
> import com.capiot.analytics.spark.util.KnowledgeBaseHelperUtil;
> import com.capiot.analytics.spark.util.TrackingAgendaEventListener;
> import org.apache.spark.api.java.function.VoidFunction;
> import org.drools.runtime.StatefulKnowledgeSession;
>
> import java.io.BufferedWriter;
> import java.io.PrintWriter;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.atomic.AtomicInteger;
>
> public class RuleExceutionFunction implements VoidFunction <Person>
> {
> static StatefulKnowledgeSession knowledgeSession;
> static List<Person> customersWithOffers =
> Collections.synchronizedList(new ArrayList());
> //static Map<Integer, String> map = Collections.synchronizedMap(new
> HashMap());
> static TrackingAgendaEventListener agendaEventListener = new
> TrackingAgendaEventListener();
>
> static AtomicInteger count = new AtomicInteger(0);
>
> //private static final File f = new
> File("C:\\Users\\bajit\\Documents\\customerOffers_5k_file.csv");
> private static PrintWriter pw = null;
> private static PrintWriter pwp = null;
>
> private static final long serialVersionUID = 2370;
>
> public RuleExceutionFunction() throws Exception
> {
> if (knowledgeSession == null)
> {
> knowledgeSession =
> KnowledgeBaseHelperUtil.getStatefulKnowledgeSession("offers.drl");
> knowledgeSession.addEventListener(agendaEventListener);
>
> {
> pw = new PrintWriter(new BufferedWriter(new
> java.io.FileWriter
>
> ("C:\\Users\\bajit\\Documents\\customerOffers_file_5k.csv")
> ), true);
>
> pwp = new PrintWriter(new BufferedWriter(new
> java.io.FileWriter
>
> ("C:\\Users\\bajit\\Documents\\processed_customers_file_5k" +
> ".csv")
> ), true);
> }
> }
> }
>
>
> @Override
> public void call(Person person) throws Exception
> {
> //List<Person> facts = rdd.collect();
> //Apply rules on facts here
> //synchronized (this)
> {
> knowledgeSession.insert(person);
> int i = knowledgeSession.fireAllRules();
> }
>
> //System.out.println("++++++ '"+
> agendaEventListener.activationsToString());
>
> if (person.hasOffer())
> {
> customersWithOffers.add(person);
> //Files.append(person.toString() +
> System.getProperty("line.separator"), f, Charset.defaultCharset());
> pw.println(person.toString());
> }
>
> pwp.println(person.toString());
>
> count.getAndIncrement();
> }
>
> public StatefulKnowledgeSession getSession()
> {
> return knowledgeSession;
> }
>
> public List<Person> getCustomersWithOffers()
> {
> return customersWithOffers;
> }
> }
>
>
> On Mon, Jul 6, 2015 at 10:21 AM, Jörn Franke <jo...@gmail.com> wrote:
>
>> Can you provide the result set you are using and specify how you
>> integrated the drools engine?
>> Drools basically is based on a large shared memory. Hence, if you have
>> several tasks in Shark they end up using different shared memory areas.
>> A full integration of drools requires some sophisticated design and
>> probably rewriting of the rules evaluation algorithm, so you probably have
>> to rewrite that engine from scratch.
>>
>> Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <aj...@capiot.com> a écrit :
>>
>>> Hi,
>>>
>>> I am trying to integrate Drools rules API with Spark so that the
>>> solution could solve few CEP centric use cases.
>>>
>>> When I read data from a local file (simple FileWriter -> readLine()), I
>>> see that all my rules are reliably fired and everytime I get the results as
>>> expected. I have tested with file record sizes from 5K to 5M; results are
>>> as expected, every time.
>>>
>>> However when I try to receive same data through a stream (I created a
>>> simple ServerSocket, and am reading the file and writing to the socket line
>>> by line) using a custom socket receiver; even though I see that data is
>>> received at the custom receiver's end; perhaps store() API has an issue,
>>> and data is not reliably persisted, (I am
>>> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).
>>>
>>> Result is that my rules don't get fired reliably, and everytime I get a
>>> different result. It also could be internal data loss within Spark engine.
>>>
>>> I am using a a single Windows based server, and latest 1.4.0.
>>>
>>> I have attached code for custom receiver, and my socket server which
>>> streams file data as text.
>>>
>>> Can someone pls shed more light on this issue? I have read in the
>>> documentation that a reliable receiver needs to implement
>>> *store(multi-records)*, but couldn't find any example.
>>>
>>> Many thanks in advance for any inputs or suggestions for trying out.
>>>
>>> Regards,
>>> Ajit
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>
Re: Spark custom streaming receiver not storing data reliably?
Posted by Ajit Bhingarkar <aj...@capiot.com>.
Jorn,
Thanks for your response.
I am pasting below a snippet of code which shows drools integration when
facts/events are picked up after reading through a File
(FileReader->readLine()), it works as expected and I have tested it for
wide range of record data in a File.
Same code doesn't work when I try to do same thing on a streaming incoming
data generated out of same File. I have used several batch durations, from
1 to 50 seconds. Every execution shows that rules did not fire on some
valid facts/events.
I also thought of it being an issue with multi-threading; but that is not
the case as well. I have verified.
Hope this provides with with all the relevant information.
Regards,
Ajit
/*
* Copyright (c) 2015. Capiot Software India Pvt Ltd.
* Author: ajit@capiot.com
*/
package com.capiot.analytics.spark.file;
import com.capiot.analytics.spark.Person;
import com.capiot.analytics.spark.util.KnowledgeBaseHelperUtil;
import com.capiot.analytics.spark.util.TrackingAgendaEventListener;
import org.apache.spark.api.java.function.VoidFunction;
import org.drools.runtime.StatefulKnowledgeSession;
import java.io.BufferedWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class RuleExceutionFunction implements VoidFunction <Person>
{
static StatefulKnowledgeSession knowledgeSession;
static List<Person> customersWithOffers =
Collections.synchronizedList(new ArrayList());
//static Map<Integer, String> map = Collections.synchronizedMap(new
HashMap());
static TrackingAgendaEventListener agendaEventListener = new
TrackingAgendaEventListener();
static AtomicInteger count = new AtomicInteger(0);
//private static final File f = new
File("C:\\Users\\bajit\\Documents\\customerOffers_5k_file.csv");
private static PrintWriter pw = null;
private static PrintWriter pwp = null;
private static final long serialVersionUID = 2370;
public RuleExceutionFunction() throws Exception
{
if (knowledgeSession == null)
{
knowledgeSession =
KnowledgeBaseHelperUtil.getStatefulKnowledgeSession("offers.drl");
knowledgeSession.addEventListener(agendaEventListener);
{
pw = new PrintWriter(new BufferedWriter(new
java.io.FileWriter
("C:\\Users\\bajit\\Documents\\customerOffers_file_5k.csv")
), true);
pwp = new PrintWriter(new BufferedWriter(new
java.io.FileWriter
("C:\\Users\\bajit\\Documents\\processed_customers_file_5k" +
".csv")
), true);
}
}
}
@Override
public void call(Person person) throws Exception
{
//List<Person> facts = rdd.collect();
//Apply rules on facts here
//synchronized (this)
{
knowledgeSession.insert(person);
int i = knowledgeSession.fireAllRules();
}
//System.out.println("++++++ '"+
agendaEventListener.activationsToString());
if (person.hasOffer())
{
customersWithOffers.add(person);
//Files.append(person.toString() +
System.getProperty("line.separator"), f, Charset.defaultCharset());
pw.println(person.toString());
}
pwp.println(person.toString());
count.getAndIncrement();
}
public StatefulKnowledgeSession getSession()
{
return knowledgeSession;
}
public List<Person> getCustomersWithOffers()
{
return customersWithOffers;
}
}
On Mon, Jul 6, 2015 at 10:21 AM, Jörn Franke <jo...@gmail.com> wrote:
> Can you provide the result set you are using and specify how you
> integrated the drools engine?
> Drools basically is based on a large shared memory. Hence, if you have
> several tasks in Shark they end up using different shared memory areas.
> A full integration of drools requires some sophisticated design and
> probably rewriting of the rules evaluation algorithm, so you probably have
> to rewrite that engine from scratch.
>
> Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <aj...@capiot.com> a écrit :
>
>> Hi,
>>
>> I am trying to integrate Drools rules API with Spark so that the solution
>> could solve few CEP centric use cases.
>>
>> When I read data from a local file (simple FileWriter -> readLine()), I
>> see that all my rules are reliably fired and everytime I get the results as
>> expected. I have tested with file record sizes from 5K to 5M; results are
>> as expected, every time.
>>
>> However when I try to receive same data through a stream (I created a
>> simple ServerSocket, and am reading the file and writing to the socket line
>> by line) using a custom socket receiver; even though I see that data is
>> received at the custom receiver's end; perhaps store() API has an issue,
>> and data is not reliably persisted, (I am
>> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).
>>
>> Result is that my rules don't get fired reliably, and everytime I get a
>> different result. It also could be internal data loss within Spark engine.
>>
>> I am using a a single Windows based server, and latest 1.4.0.
>>
>> I have attached code for custom receiver, and my socket server which
>> streams file data as text.
>>
>> Can someone pls shed more light on this issue? I have read in the
>> documentation that a reliable receiver needs to implement
>> *store(multi-records)*, but couldn't find any example.
>>
>> Many thanks in advance for any inputs or suggestions for trying out.
>>
>> Regards,
>> Ajit
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>
>
Re: Spark custom streaming receiver not storing data reliably?
Posted by Jörn Franke <jo...@gmail.com>.
Can you provide the result set you are using and specify how you integrated
the drools engine?
Drools basically is based on a large shared memory. Hence, if you have
several tasks in Shark they end up using different shared memory areas.
A full integration of drools requires some sophisticated design and
probably rewriting of the rules evaluation algorithm, so you probably have
to rewrite that engine from scratch.
Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <aj...@capiot.com> a écrit :
> Hi,
>
> I am trying to integrate Drools rules API with Spark so that the solution
> could solve few CEP centric use cases.
>
> When I read data from a local file (simple FileWriter -> readLine()), I
> see that all my rules are reliably fired and everytime I get the results as
> expected. I have tested with file record sizes from 5K to 5M; results are
> as expected, every time.
>
> However when I try to receive same data through a stream (I created a
> simple ServerSocket, and am reading the file and writing to the socket line
> by line) using a custom socket receiver; even though I see that data is
> received at the custom receiver's end; perhaps store() API has an issue,
> and data is not reliably persisted, (I am
> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).
>
> Result is that my rules don't get fired reliably, and everytime I get a
> different result. It also could be internal data loss within Spark engine.
>
> I am using a a single Windows based server, and latest 1.4.0.
>
> I have attached code for custom receiver, and my socket server which
> streams file data as text.
>
> Can someone pls shed more light on this issue? I have read in the
> documentation that a reliable receiver needs to implement
> *store(multi-records)*, but couldn't find any example.
>
> Many thanks in advance for any inputs or suggestions for trying out.
>
> Regards,
> Ajit
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org