You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ian Brooks <i....@sensewhere.com> on 2016/07/13 10:13:26 UTC

Flume integration

Hi,

I'm currently trying to implement a prototype Spark application that gets data from Flume and processes it. I'm using the pull based method mentioned in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html 

The is initially working fine for getting data from Flume, however the Spark client doesn't appear to be letting Flume know that the data has been received, so Flume doesn't remove it from the batch. 

After 100 requests Flume stops allowing any new data and logs

08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error while processing transaction. 


My code to pull the data from Flume is

        SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
        Duration batchInterval = new Duration(10000);
        
        final String checkpointDir = "/tmp/";

        final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
        ssc.checkpoint(checkpointDir);
        
        JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(ssc, host, port);

        
        // Transform each flume avro event to a process-able format
        JavaDStream<String> transformedEvents = flumeStream.map(new Function<SparkFlumeEvent, String>() {

                @Override
                public String call(SparkFlumeEvent flumeEvent) throws Exception {
                    String flumeEventStr = flumeEvent.event().toString();
                    avroData avroData = new avroData();
                    
                    Gson gson = new GsonBuilder().create();
                    avroData = gson.fromJson(flumeEventStr, avroData.class);     
                    HashMap<String,String> body = avroData.getBody();
                    String data = body.get("bytes");
                     
                    return data;
                }
        });
        

	...

        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
    

Is there something specific I should be doing to let the Flume server know the batch has been received and processed?


*Ian Brooks*


Re: Flume integration

Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks Ian.

Was your source of Flume IBM/MQ by any chance?



Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 November 2016 at 16:40, Ian Brooks <i....@sensewhere.com> wrote:

> Hi Mich,
>
>
>
> Yes, i managed to resolve this one. The issue was because the way
> described in the docs doesn't work properly as in order for the Flume part
> to be notified you need to set the storageLevel on the PollingStream like
>
>
>
> JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(ssc,
> addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 10);
>
>
>
>
>
> After setting this, the data is correclty maked as processed by the SPARK
> reveiver and the Flume sink is notified.
>
>
>
> -Ian
>
>
>
>
>
> > Hi Ian,
>
> >
>
> > Has this been resolved?
>
> >
>
> > How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> >
>
> > Thanks
>
> >
>
> > Dr Mich Talebzadeh
>
> >
>
> >
>
> >
>
> > LinkedIn *
>
> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCd
> OABU
>
> > rV8Pw
>
> > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCd
> OAB
>
> > UrV8Pw>*
>
> >
>
> >
>
> >
>
> > http://talebzadehmich.wordpress.com
>
> >
>
> >
>
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
>
> > loss, damage or destruction of data or any other property which may arise
>
> > from relying on this email's technical content is explicitly disclaimed.
>
> > The author will in no case be liable for any monetary damages arising
> from
>
> > such loss, damage or destruction.
>
> >
>
> > On 13 July 2016 at 11:13, Ian Brooks <i....@sensewhere.com> wrote:
>
> > > Hi,
>
> > >
>
> > >
>
> > >
>
> > > I'm currently trying to implement a prototype Spark application that
> gets
>
> > > data from Flume and processes it. I'm using the pull based method
>
> > > mentioned
>
> > > in https://spark.apache.org/docs/1.6.1/streaming-flume-
> integration.html
>
> > >
>
> > >
>
> > >
>
> > > The is initially working fine for getting data from Flume, however the
>
> > > Spark client doesn't appear to be letting Flume know that the data has
>
> > > been
>
> > > received, so Flume doesn't remove it from the batch.
>
> > >
>
> > >
>
> > >
>
> > > After 100 requests Flume stops allowing any new data and logs
>
> > >
>
> > >
>
> > >
>
> > > 08 Jul 2016 14:59:00,265 WARN [Spark Sink Processor Thread - 5]
>
> > > (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80) -
>
> > > Error while processing transaction.
>
> > > org.apache.flume.ChannelException: Take list for MemoryTransaction,
>
> > > capacity 100 full, consider committing more frequently, increasing
>
> > > capacity, or increasing thread count
>
> > >
>
> > > at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
>
> > >
>
> > > MemoryChannel.java:96)
>
> > >
>
> > >
>
> > >
>
> > > My code to pull the data from Flume is
>
> > >
>
> > >
>
> > >
>
> > > SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> > >
>
> > > Duration batchInterval = new Duration(10000);
>
> > >
>
> > > final String checkpointDir = "/tmp/";
>
> > >
>
> > >
>
> > >
>
> > > final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>
> > > batchInterval);
>
> > >
>
> > > ssc.checkpoint(checkpointDir);
>
> > >
>
> > > JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
>
> > > FlumeUtils.createPollingStream(ssc, host, port);
>
> > >
>
> > >
>
> > >
>
> > > // Transform each flume avro event to a process-able format
>
> > >
>
> > > JavaDStream<String> transformedEvents = flumeStream.map(new
>
> > > Function<SparkFlumeEvent, String>() {
>
> > >
>
> > >
>
> > >
>
> > > @Override
>
> > >
>
> > > public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> > >
>
> > > String flumeEventStr = flumeEvent.event().toString();
>
> > >
>
> > > avroData avroData = new avroData();
>
> > >
>
> > > Gson gson = new GsonBuilder().create();
>
> > >
>
> > > avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> > >
>
> > > HashMap<String,String> body = avroData.getBody();
>
> > >
>
> > > String data = body.get("bytes");
>
> > >
>
> > > return data;
>
> > >
>
> > > }
>
> > >
>
> > > });
>
> > >
>
> > >
>
> > >
>
> > > ...
>
> > >
>
> > >
>
> > >
>
> > > ssc.start();
>
> > >
>
> > > ssc.awaitTermination();
>
> > >
>
> > > ssc.close();
>
> > >
>
> > > }
>
> > >
>
> > >
>
> > >
>
> > > Is there something specific I should be doing to let the Flume server
> know
>
> > > the batch has been received and processed?
>
> > >
>
> > >
>
> > > --
>
> > >
>
> > > Ian Brooks
>
>
>
>
> --
>
> Ian Brooks
>
> Lead Cloud Systems Engineer
>
>
>
> Mobile: +44 7900987187
>
> UK Office: +44 131 629 5155
>
> US Office: +1 650 943 2403
>
> Skype: ijbrooks
>
>
>
> E-mail: i.brooks@sensewhere.com
>
> Web: www.sensewhere.com
>
>
>
> sensewhere Ltd. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA
>
> Company Number: SC357036
>
> sensewhere USA 800 West El Camino Real, Suite 180, Mountain View,
> California, 94040
>
> sensewhere China Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue,
> Nanshan District, Shenzhen 51806
>
>
>
>

Re: Flume integration

Posted by Ian Brooks <i....@sensewhere.com>.
Hi Mich,

Yes, i managed to resolve this one. The issue was because the way described in the docs 
doesn't work properly as in order for the Flume part to be notified you need to set the 
storageLevel on the PollingStream like

        JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = 
FlumeUtils.createPollingStream(ssc, addresses, 
StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 10);


After setting this, the data is correclty maked as processed by the SPARK reveiver and the 
Flume sink is notified.

-Ian


> Hi Ian,
> 
> Has this been resolved?
> 
> How about data to Flume and then Kafka and Kafka streaming into Spark?
> 
> Thanks
> 
> Dr Mich Talebzadeh
> 
> 
> 
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABU
> rV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOAB
> UrV8Pw>*
> 
> 
> 
> http://talebzadehmich.wordpress.com
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> 

Re: Flume integration

Posted by Mich Talebzadeh <mi...@gmail.com>.
Correct Ayan.

However, I have these thoughts


   1. Never ingested from IBM/MQ to Spark Streaming
   2. One way is use Flume to use IBM/MQ as source and HDFS as sync.
   However, anything on HDFS becomes Data at rest
   3. If you want streaming data then hook IBM/MQ to Flume, use Kafka ass
   source then use Kafka DSream in Spark


The diagram below has


 [image: Inline images 1]

It points to Flume as well but have not really seen any example of it.
Kafka and Twitter I have tried. However, I don't see the use case for
streaming data from HDFS into Spark Streaming. Does that mean data at rest
becomes data in motion? Sure one can use Spark to read from HDFS directory
(Spark CSV), however, that is not really Spark Streaming.

Thanks

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 November 2016 at 22:06, ayan guha <gu...@gmail.com> wrote:

> Hi Mitch
>
> I see your point now. So you are thinking Kafka as a central data exchange
> platform and using flume to ingest one particular source to Kafka, am I
> right?
>
> In that case it makes a lot of sense.
> On 21 Nov 2016 08:29, "Mich Talebzadeh" <mi...@gmail.com> wrote:
>
>> Agreed but imagine a situation where your input stream is IBM/MQ and you
>> want to use Spark to do some form of near real time calculation (Speed
>> layer).
>>
>> Ignoring ingesting directly from IBM/MQ you can possibly use Flume to get
>> data from IBM/MQ on HDFS. However, that is by definition a batch layer.
>>
>> Other alternative is to use Flume to feed into Kafka and do calculation
>> in Spark itself on the raw data.
>>
>>
>> I did search in Spark forum to see how many examples of using Flume
>> integration with Spark and so found this example from Ian. So I suppose you
>> can argue that there is no need for Kafka. However, Kafka integration into
>> Spark streaming is well established and at least I have used in in anger
>> and feel more comfortable that it is tried and tested. The issue is not
>> necessarily flume to Spark streaming is not doable but more risk aversion.
>>
>> Let me know your thoughts.
>>
>> HTH
>>
>>
>> ​​
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 20 November 2016 at 20:59, ayan guha <gu...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> While I am following this discussion with interest, I am trying to
>>> comprehend any architectural benefit of a spark sink.
>>>
>>> Is there any feature in flume makes it more suitable to ingest stream
>>> data than sppark streaming, so that we should chain them? For example does
>>> it help durability or reliability of the source?
>>>
>>> Or, it is a more tactical choice based on connector availability or such?
>>>
>>> To me, flume is important component to ingest streams to hdfs or hive
>>> directly ie it plays on the batch side of lambda architecture pattern.
>>> On 20 Nov 2016 22:30, "Mich Talebzadeh" <mi...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ian,
>>>>
>>>> Has this been resolved?
>>>>
>>>> How about data to Flume and then Kafka and Kafka streaming into Spark?
>>>>
>>>> Thanks
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>> On 13 July 2016 at 11:13, Ian Brooks <i....@sensewhere.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> I'm currently trying to implement a prototype Spark application that
>>>>> gets data from Flume and processes it. I'm using the pull based method
>>>>> mentioned in https://spark.apache.org/docs/
>>>>> 1.6.1/streaming-flume-integration.html
>>>>>
>>>>>
>>>>>
>>>>> The is initially working fine for getting data from Flume, however the
>>>>> Spark client doesn't appear to be letting Flume know that the data has been
>>>>> received, so Flume doesn't remove it from the batch.
>>>>>
>>>>>
>>>>>
>>>>> After 100 requests Flume stops allowing any new data and logs
>>>>>
>>>>>
>>>>>
>>>>> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
>>>>> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)
>>>>>  - Error while processing transaction.
>>>>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>>>>> capacity 100 full, consider committing more frequently, increasing
>>>>> capacity, or increasing thread count
>>>>>        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doT
>>>>> ake(MemoryChannel.java:96)
>>>>>
>>>>>
>>>>>
>>>>> My code to pull the data from Flume is
>>>>>
>>>>>
>>>>>
>>>>> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>>>>>
>>>>> Duration batchInterval = new Duration(10000);
>>>>>
>>>>> final String checkpointDir = "/tmp/";
>>>>>
>>>>>
>>>>>
>>>>> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>>>>> batchInterval);
>>>>>
>>>>> ssc.checkpoint(checkpointDir);
>>>>>
>>>>> JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
>>>>> FlumeUtils.createPollingStream(ssc, host, port);
>>>>>
>>>>>
>>>>>
>>>>> // Transform each flume avro event to a process-able format
>>>>>
>>>>> JavaDStream<String> transformedEvents = flumeStream.map(new
>>>>> Function<SparkFlumeEvent, String>() {
>>>>>
>>>>>
>>>>>
>>>>> @Override
>>>>>
>>>>> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>>>>>
>>>>> String flumeEventStr = flumeEvent.event().toString();
>>>>>
>>>>> avroData avroData = new avroData();
>>>>>
>>>>> Gson gson = new GsonBuilder().create();
>>>>>
>>>>> avroData = gson.fromJson(flumeEventStr, avroData.class);
>>>>>
>>>>> HashMap<String,String> body = avroData.getBody();
>>>>>
>>>>> String data = body.get("bytes");
>>>>>
>>>>> return data;
>>>>>
>>>>> }
>>>>>
>>>>> });
>>>>>
>>>>>
>>>>>
>>>>> ...
>>>>>
>>>>>
>>>>>
>>>>> ssc.start();
>>>>>
>>>>> ssc.awaitTermination();
>>>>>
>>>>> ssc.close();
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> Is there something specific I should be doing to let the Flume server
>>>>> know the batch has been received and processed?
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Ian Brooks
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>

Re: Flume integration

Posted by ayan guha <gu...@gmail.com>.
Hi Mitch

I see your point now. So you are thinking Kafka as a central data exchange
platform and using flume to ingest one particular source to Kafka, am I
right?

In that case it makes a lot of sense.
On 21 Nov 2016 08:29, "Mich Talebzadeh" <mi...@gmail.com> wrote:

> Agreed but imagine a situation where your input stream is IBM/MQ and you
> want to use Spark to do some form of near real time calculation (Speed
> layer).
>
> Ignoring ingesting directly from IBM/MQ you can possibly use Flume to get
> data from IBM/MQ on HDFS. However, that is by definition a batch layer.
>
> Other alternative is to use Flume to feed into Kafka and do calculation in
> Spark itself on the raw data.
>
>
> I did search in Spark forum to see how many examples of using Flume
> integration with Spark and so found this example from Ian. So I suppose you
> can argue that there is no need for Kafka. However, Kafka integration into
> Spark streaming is well established and at least I have used in in anger
> and feel more comfortable that it is tried and tested. The issue is not
> necessarily flume to Spark streaming is not doable but more risk aversion.
>
> Let me know your thoughts.
>
> HTH
>
>
> ​​
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 20 November 2016 at 20:59, ayan guha <gu...@gmail.com> wrote:
>
>> Hi
>>
>> While I am following this discussion with interest, I am trying to
>> comprehend any architectural benefit of a spark sink.
>>
>> Is there any feature in flume makes it more suitable to ingest stream
>> data than sppark streaming, so that we should chain them? For example does
>> it help durability or reliability of the source?
>>
>> Or, it is a more tactical choice based on connector availability or such?
>>
>> To me, flume is important component to ingest streams to hdfs or hive
>> directly ie it plays on the batch side of lambda architecture pattern.
>> On 20 Nov 2016 22:30, "Mich Talebzadeh" <mi...@gmail.com>
>> wrote:
>>
>>> Hi Ian,
>>>
>>> Has this been resolved?
>>>
>>> How about data to Flume and then Kafka and Kafka streaming into Spark?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 13 July 2016 at 11:13, Ian Brooks <i....@sensewhere.com> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I'm currently trying to implement a prototype Spark application that
>>>> gets data from Flume and processes it. I'm using the pull based method
>>>> mentioned in https://spark.apache.org/docs/
>>>> 1.6.1/streaming-flume-integration.html
>>>>
>>>>
>>>>
>>>> The is initially working fine for getting data from Flume, however the
>>>> Spark client doesn't appear to be letting Flume know that the data has been
>>>> received, so Flume doesn't remove it from the batch.
>>>>
>>>>
>>>>
>>>> After 100 requests Flume stops allowing any new data and logs
>>>>
>>>>
>>>>
>>>> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
>>>> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
>>>> Error while processing transaction.
>>>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>>>> capacity 100 full, consider committing more frequently, increasing
>>>> capacity, or increasing thread count
>>>>        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doT
>>>> ake(MemoryChannel.java:96)
>>>>
>>>>
>>>>
>>>> My code to pull the data from Flume is
>>>>
>>>>
>>>>
>>>> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>>>>
>>>> Duration batchInterval = new Duration(10000);
>>>>
>>>> final String checkpointDir = "/tmp/";
>>>>
>>>>
>>>>
>>>> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>>>> batchInterval);
>>>>
>>>> ssc.checkpoint(checkpointDir);
>>>>
>>>> JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
>>>> FlumeUtils.createPollingStream(ssc, host, port);
>>>>
>>>>
>>>>
>>>> // Transform each flume avro event to a process-able format
>>>>
>>>> JavaDStream<String> transformedEvents = flumeStream.map(new
>>>> Function<SparkFlumeEvent, String>() {
>>>>
>>>>
>>>>
>>>> @Override
>>>>
>>>> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>>>>
>>>> String flumeEventStr = flumeEvent.event().toString();
>>>>
>>>> avroData avroData = new avroData();
>>>>
>>>> Gson gson = new GsonBuilder().create();
>>>>
>>>> avroData = gson.fromJson(flumeEventStr, avroData.class);
>>>>
>>>> HashMap<String,String> body = avroData.getBody();
>>>>
>>>> String data = body.get("bytes");
>>>>
>>>> return data;
>>>>
>>>> }
>>>>
>>>> });
>>>>
>>>>
>>>>
>>>> ...
>>>>
>>>>
>>>>
>>>> ssc.start();
>>>>
>>>> ssc.awaitTermination();
>>>>
>>>> ssc.close();
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> Is there something specific I should be doing to let the Flume server
>>>> know the batch has been received and processed?
>>>>
>>>>
>>>> --
>>>>
>>>> Ian Brooks
>>>>
>>>>
>>>>
>>>
>>>
>

Re: Flume integration

Posted by Mich Talebzadeh <mi...@gmail.com>.
Agreed but imagine a situation where your input stream is IBM/MQ and you
want to use Spark to do some form of near real time calculation (Speed
layer).

Ignoring ingesting directly from IBM/MQ you can possibly use Flume to get
data from IBM/MQ on HDFS. However, that is by definition a batch layer.

Other alternative is to use Flume to feed into Kafka and do calculation in
Spark itself on the raw data.


I did search in Spark forum to see how many examples of using Flume
integration with Spark and so found this example from Ian. So I suppose you
can argue that there is no need for Kafka. However, Kafka integration into
Spark streaming is well established and at least I have used in in anger
and feel more comfortable that it is tried and tested. The issue is not
necessarily flume to Spark streaming is not doable but more risk aversion.

Let me know your thoughts.

HTH


​​

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 November 2016 at 20:59, ayan guha <gu...@gmail.com> wrote:

> Hi
>
> While I am following this discussion with interest, I am trying to
> comprehend any architectural benefit of a spark sink.
>
> Is there any feature in flume makes it more suitable to ingest stream data
> than sppark streaming, so that we should chain them? For example does it
> help durability or reliability of the source?
>
> Or, it is a more tactical choice based on connector availability or such?
>
> To me, flume is important component to ingest streams to hdfs or hive
> directly ie it plays on the batch side of lambda architecture pattern.
> On 20 Nov 2016 22:30, "Mich Talebzadeh" <mi...@gmail.com> wrote:
>
>> Hi Ian,
>>
>> Has this been resolved?
>>
>> How about data to Flume and then Kafka and Kafka streaming into Spark?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 13 July 2016 at 11:13, Ian Brooks <i....@sensewhere.com> wrote:
>>
>>> Hi,
>>>
>>>
>>>
>>> I'm currently trying to implement a prototype Spark application that
>>> gets data from Flume and processes it. I'm using the pull based method
>>> mentioned in https://spark.apache.org/docs/
>>> 1.6.1/streaming-flume-integration.html
>>>
>>>
>>>
>>> The is initially working fine for getting data from Flume, however the
>>> Spark client doesn't appear to be letting Flume know that the data has been
>>> received, so Flume doesn't remove it from the batch.
>>>
>>>
>>>
>>> After 100 requests Flume stops allowing any new data and logs
>>>
>>>
>>>
>>> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
>>> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
>>> Error while processing transaction.
>>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>>> capacity 100 full, consider committing more frequently, increasing
>>> capacity, or increasing thread count
>>>        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doT
>>> ake(MemoryChannel.java:96)
>>>
>>>
>>>
>>> My code to pull the data from Flume is
>>>
>>>
>>>
>>> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>>>
>>> Duration batchInterval = new Duration(10000);
>>>
>>> final String checkpointDir = "/tmp/";
>>>
>>>
>>>
>>> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>>> batchInterval);
>>>
>>> ssc.checkpoint(checkpointDir);
>>>
>>> JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
>>> FlumeUtils.createPollingStream(ssc, host, port);
>>>
>>>
>>>
>>> // Transform each flume avro event to a process-able format
>>>
>>> JavaDStream<String> transformedEvents = flumeStream.map(new
>>> Function<SparkFlumeEvent, String>() {
>>>
>>>
>>>
>>> @Override
>>>
>>> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>>>
>>> String flumeEventStr = flumeEvent.event().toString();
>>>
>>> avroData avroData = new avroData();
>>>
>>> Gson gson = new GsonBuilder().create();
>>>
>>> avroData = gson.fromJson(flumeEventStr, avroData.class);
>>>
>>> HashMap<String,String> body = avroData.getBody();
>>>
>>> String data = body.get("bytes");
>>>
>>> return data;
>>>
>>> }
>>>
>>> });
>>>
>>>
>>>
>>> ...
>>>
>>>
>>>
>>> ssc.start();
>>>
>>> ssc.awaitTermination();
>>>
>>> ssc.close();
>>>
>>> }
>>>
>>>
>>>
>>> Is there something specific I should be doing to let the Flume server
>>> know the batch has been received and processed?
>>>
>>>
>>> --
>>>
>>> Ian Brooks
>>>
>>>
>>>
>>
>>

Re: Flume integration

Posted by Ian Brooks <i....@sensewhere.com>.
Hi Mich,

Thanks. I would prefer not to add another system into the mix as we currently don't use kafka at all. We are still in the prototype phase at the moment and it seems to be working well though it doesn't like you restrating the flume sink part without restarting the SPARK application. That is something we should be able to manage though.



*-Ian *


Hi Ian,


Flume is great for ingesting data into HDFS and Hbase. However, that is part of batch layer.


For real time processing, I would go through Kafka into spark streaming. Except your case, I have not established if anyone else does Flume directly into Spark?


If so how mature is it.


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[1]/ 
  
http://talebzadehmich.wordpress.com[2]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. 
  


On 21 November 2016 at 10:27, Ian Brooks <i.brooks@sensewhere.com[3]> wrote:




*-Ian*


Hi
While I am following this discussion with interest, I am trying to comprehend any architectural benefit of a spark sink.
Is there any feature in flume makes it more suitable to ingest stream data than sppark streaming, so that we should chain them? For example does it help durability or reliability of the source?
Or, it is a more tactical choice based on connector availability or such?
To me, flume is important component to ingest streams to hdfs or hive directly ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh" <mich.talebzadeh@gmail.com[4]> wrote:


Hi Ian,


Has this been resolved?


How about data to Flume and then Kafka and Kafka streaming into Spark?


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[1]/ 
  
http://talebzadehmich.wordpress.com[2]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. 
  


On 13 July 2016 at 11:13, Ian Brooks <i.brooks@sensewhere.com[3]> wrote:


Hi,
 
I'm currently trying to implement a prototype Spark application that gets data from Flume and processes it. I'm using the pull based method mentioned in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html[5] 
 
The is initially working fine for getting data from Flume, however the Spark client doesn't appear to be letting Flume know that the data has been received, so Flume doesn't remove it from the batch. 
 
After 100 requests Flume stops allowing any new data and logs
 
08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error while processing transaction. 

 
My code to pull the data from Flume is
 
SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(10000);
final String checkpointDir = "/tmp/";
 
final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
ssc.checkpoint(checkpointDir);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(ssc, host, port);
 
// Transform each flume avro event to a process-able format
JavaDStream<String> transformedEvents = flumeStream.map(new Function<SparkFlumeEvent, String>() {
 
@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception {
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();
Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class); 
HashMap<String,String> body = avroData.getBody();
String data = body.get("bytes");
return data;
}
});
 
...
 
ssc.start();
ssc.awaitTermination();
ssc.close();
}
 
Is there something specific I should be doing to let the Flume server know the batch has been received and processed?


*Ian Brooks*
 




*Ian Brooks*
Lead Cloud Systems Engineer
 
Mobile: +44 7900987187[6]
UK Office: +44 131 629 5155[7]
US Office: +1 650 943 2403[8]
Skype: ijbrooks
 
E-mail: _i.brooks@sensewhere.com_ 
Web: www.sensewhere.com[9] 
 
*sensewhere Ltd*. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA. 
Company Number: SC357036
*sensewhere USA* 800 West El Camino Real, Suite 180, Mountain View, California, 94040
*sensewhere China* Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue, Nanshan District, Shenzhen 51806
 
      




*Ian Brooks*
Lead Cloud Systems Engineer

Mobile: +44 7900987187
UK Office: +44 131 629 5155
US Office: +1 650 943 2403
Skype: ijbrooks

E-mail: i.brooks@sensewhere.com[10] 
Web: www.sensewhere.com[9] 

*sensewhere Ltd*. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA.
Company Number: SC357036
*sensewhere USA* 800 West El Camino Real, Suite 180, Mountain View, California, 94040
*sensewhere China* Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue, Nanshan District, Shenzhen 51806

      

--------
[1] https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
[2] http://talebzadehmich.wordpress.com
[3] mailto:i.brooks@sensewhere.com
[4] mailto:mich.talebzadeh@gmail.com
[5] https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
[6] tel:%2B44%207900987187
[7] tel:%2B44%20131%20629%205155
[8] tel:%2B1%20650%20943%202403
[9] http://www.sensewhere.com/
[10] mailt:i.brooks@sensewhere.com

Re: Flume integration

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Ian,

Flume is great for ingesting data into HDFS and Hbase. However, that is
part of batch layer.

For real time processing, I would go through Kafka into spark streaming.
Except your case, I have not established if anyone else does Flume directly
into Spark?

If so how mature is it.

Thanks

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 21 November 2016 at 10:27, Ian Brooks <i....@sensewhere.com> wrote:

>
> We use Flume already as our way of getting data from our application in to
> HDFS and HBase, we have some new work we are looking at that requires
> realtime processing on data that we don't need to store, so It fits into
> our existing platform easier just to pass the data through Flume like
> everything else and just route this data to SPARK.
>
> -Ian
>
>
>
>
> On Monday 21 November 2016 07:59:42 ayan guha wrote:
>
> Hi
>
> While I am following this discussion with interest, I am trying to
> comprehend any architectural benefit of a spark sink.
>
> Is there any feature in flume makes it more suitable to ingest stream data
> than sppark streaming, so that we should chain them? For example does it
> help durability or reliability of the source?
>
> Or, it is a more tactical choice based on connector availability or such?
>
> To me, flume is important component to ingest streams to hdfs or hive
> directly ie it plays on the batch side of lambda architecture pattern.
>
> On 20 Nov 2016 22:30, "Mich Talebzadeh" <mi...@gmail.com> wrote:
>
> Hi Ian,
>
>
> Has this been resolved?
>
>
> How about data to Flume and then Kafka and Kafka streaming into Spark?
>
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  https://www.linkedin.com/profile/view?id=
> AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On 13 July 2016 at 11:13, Ian Brooks <i....@sensewhere.com> wrote:
>
> Hi,
>
>
>
> I'm currently trying to implement a prototype Spark application that gets
> data from Flume and processes it. I'm using the pull based method mentioned
> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>
>
>
> The is initially working fine for getting data from Flume, however the
> Spark client doesn't appear to be letting Flume know that the data has been
> received, so Flume doesn't remove it from the batch.
>
>
>
> After 100 requests Flume stops allowing any new data and logs
>
>
>
> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
> Error while processing transaction.
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
> MemoryChannel.java:96)
>
>
>
> My code to pull the data from Flume is
>
>
>
> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> Duration batchInterval = new Duration(10000);
>
> final String checkpointDir = "/tmp/";
>
>
>
> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> ssc.checkpoint(checkpointDir);
>
> JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(ssc,
> host, port);
>
>
>
> // Transform each flume avro event to a process-able format
>
> JavaDStream<String> transformedEvents = flumeStream.map(new
> Function<SparkFlumeEvent, String>() {
>
>
>
> @Override
>
> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> String flumeEventStr = flumeEvent.event().toString();
>
> avroData avroData = new avroData();
>
> Gson gson = new GsonBuilder().create();
>
> avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> HashMap<String,String> body = avroData.getBody();
>
> String data = body.get("bytes");
>
> return data;
>
> }
>
> });
>
>
>
> ...
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> ssc.close();
>
> }
>
>
>
> Is there something specific I should be doing to let the Flume server know
> the batch has been received and processed?
>
>
> --
>
> Ian Brooks
>
>
>
>
>
>
>
> --
>
> Ian Brooks
>
> Lead Cloud Systems Engineer
>
>
>
> Mobile: +44 7900987187
>
> UK Office: +44 131 629 5155
>
> US Office: +1 650 943 2403
>
> Skype: ijbrooks
>
>
>
> E-mail: i.brooks@sensewhere.com
>
> Web: www.sensewhere.com
>
>
>
> sensewhere Ltd. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA.
>
> Company Number: SC357036
>
> sensewhere USA 800 West El Camino Real, Suite 180, Mountain View,
> California, 94040
>
> sensewhere China Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue,
> Nanshan District, Shenzhen 51806
>
>
>
>

Re: Flume integration

Posted by Ian Brooks <i....@sensewhere.com>.
*-Ian*


Hi
While I am following this discussion with interest, I am trying to comprehend any architectural benefit of a spark sink.
Is there any feature in flume makes it more suitable to ingest stream data than sppark streaming, so that we should chain them? For example does it help durability or reliability of the source?
Or, it is a more tactical choice based on connector availability or such?
To me, flume is important component to ingest streams to hdfs or hive directly ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh" <mich.talebzadeh@gmail.com[1]> wrote:


Hi Ian,


Has this been resolved?


How about data to Flume and then Kafka and Kafka streaming into Spark?


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[2]/ 
  
http://talebzadehmich.wordpress.com[3]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. 
  


On 13 July 2016 at 11:13, Ian Brooks <i.brooks@sensewhere.com[4]> wrote:


Hi,
 
I'm currently trying to implement a prototype Spark application that gets data from Flume and processes it. I'm using the pull based method mentioned in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html[5] 
 
The is initially working fine for getting data from Flume, however the Spark client doesn't appear to be letting Flume know that the data has been received, so Flume doesn't remove it from the batch. 
 
After 100 requests Flume stops allowing any new data and logs
 
08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error while processing transaction. 

 
My code to pull the data from Flume is
 
SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(10000);
final String checkpointDir = "/tmp/";
 
final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
ssc.checkpoint(checkpointDir);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(ssc, host, port);
 
// Transform each flume avro event to a process-able format
JavaDStream<String> transformedEvents = flumeStream.map(new Function<SparkFlumeEvent, String>() {
 
@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception {
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();
Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class); 
HashMap<String,String> body = avroData.getBody();
String data = body.get("bytes");
return data;
}
});
 
...
 
ssc.start();
ssc.awaitTermination();
ssc.close();
}
 
Is there something specific I should be doing to let the Flume server know the batch has been received and processed?


*Ian Brooks*
 




*Ian Brooks*
Lead Cloud Systems Engineer

Mobile: +44 7900987187
UK Office: +44 131 629 5155
US Office: +1 650 943 2403
Skype: ijbrooks

E-mail: i.brooks@sensewhere.com[6] 
Web: www.sensewhere.com[7] 

*sensewhere Ltd*. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA.
Company Number: SC357036
*sensewhere USA* 800 West El Camino Real, Suite 180, Mountain View, California, 94040
*sensewhere China* Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue, Nanshan District, Shenzhen 51806

      

--------
[1] mailto:mich.talebzadeh@gmail.com
[2] https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
[3] http://talebzadehmich.wordpress.com
[4] mailto:i.brooks@sensewhere.com
[5] https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
[6] mailt:i.brooks@sensewhere.com
[7] http://www.sensewhere.com/

Re: Flume integration

Posted by ayan guha <gu...@gmail.com>.
Hi

While I am following this discussion with interest, I am trying to
comprehend any architectural benefit of a spark sink.

Is there any feature in flume makes it more suitable to ingest stream data
than sppark streaming, so that we should chain them? For example does it
help durability or reliability of the source?

Or, it is a more tactical choice based on connector availability or such?

To me, flume is important component to ingest streams to hdfs or hive
directly ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh" <mi...@gmail.com> wrote:

> Hi Ian,
>
> Has this been resolved?
>
> How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 July 2016 at 11:13, Ian Brooks <i....@sensewhere.com> wrote:
>
>> Hi,
>>
>>
>>
>> I'm currently trying to implement a prototype Spark application that gets
>> data from Flume and processes it. I'm using the pull based method mentioned
>> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>>
>>
>>
>> The is initially working fine for getting data from Flume, however the
>> Spark client doesn't appear to be letting Flume know that the data has been
>> received, so Flume doesn't remove it from the batch.
>>
>>
>>
>> After 100 requests Flume stops allowing any new data and logs
>>
>>
>>
>> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
>> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
>> Error while processing transaction.
>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>> capacity 100 full, consider committing more frequently, increasing
>> capacity, or increasing thread count
>>        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.
>> doTake(MemoryChannel.java:96)
>>
>>
>>
>> My code to pull the data from Flume is
>>
>>
>>
>> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>>
>> Duration batchInterval = new Duration(10000);
>>
>> final String checkpointDir = "/tmp/";
>>
>>
>>
>> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>> batchInterval);
>>
>> ssc.checkpoint(checkpointDir);
>>
>> JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
>> FlumeUtils.createPollingStream(ssc, host, port);
>>
>>
>>
>> // Transform each flume avro event to a process-able format
>>
>> JavaDStream<String> transformedEvents = flumeStream.map(new
>> Function<SparkFlumeEvent, String>() {
>>
>>
>>
>> @Override
>>
>> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>>
>> String flumeEventStr = flumeEvent.event().toString();
>>
>> avroData avroData = new avroData();
>>
>> Gson gson = new GsonBuilder().create();
>>
>> avroData = gson.fromJson(flumeEventStr, avroData.class);
>>
>> HashMap<String,String> body = avroData.getBody();
>>
>> String data = body.get("bytes");
>>
>> return data;
>>
>> }
>>
>> });
>>
>>
>>
>> ...
>>
>>
>>
>> ssc.start();
>>
>> ssc.awaitTermination();
>>
>> ssc.close();
>>
>> }
>>
>>
>>
>> Is there something specific I should be doing to let the Flume server
>> know the batch has been received and processed?
>>
>>
>> --
>>
>> Ian Brooks
>>
>>
>>
>
>

Re: Flume integration

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Ian,

Has this been resolved?

How about data to Flume and then Kafka and Kafka streaming into Spark?

Thanks

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 July 2016 at 11:13, Ian Brooks <i....@sensewhere.com> wrote:

> Hi,
>
>
>
> I'm currently trying to implement a prototype Spark application that gets
> data from Flume and processes it. I'm using the pull based method mentioned
> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>
>
>
> The is initially working fine for getting data from Flume, however the
> Spark client doesn't appear to be letting Flume know that the data has been
> received, so Flume doesn't remove it from the batch.
>
>
>
> After 100 requests Flume stops allowing any new data and logs
>
>
>
> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
> Error while processing transaction.
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
> MemoryChannel.java:96)
>
>
>
> My code to pull the data from Flume is
>
>
>
> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> Duration batchInterval = new Duration(10000);
>
> final String checkpointDir = "/tmp/";
>
>
>
> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> ssc.checkpoint(checkpointDir);
>
> JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(ssc,
> host, port);
>
>
>
> // Transform each flume avro event to a process-able format
>
> JavaDStream<String> transformedEvents = flumeStream.map(new
> Function<SparkFlumeEvent, String>() {
>
>
>
> @Override
>
> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> String flumeEventStr = flumeEvent.event().toString();
>
> avroData avroData = new avroData();
>
> Gson gson = new GsonBuilder().create();
>
> avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> HashMap<String,String> body = avroData.getBody();
>
> String data = body.get("bytes");
>
> return data;
>
> }
>
> });
>
>
>
> ...
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> ssc.close();
>
> }
>
>
>
> Is there something specific I should be doing to let the Flume server know
> the batch has been received and processed?
>
>
> --
>
> Ian Brooks
>
>
>