You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@edgent.apache.org by Otis Gospodnetić <ot...@gmail.com> on 2017/12/29 21:41:23 UTC

Buffering on the edge (network issues)

Hi,

I tried looking through the API and examples to see if there is Edgent has
any built-in buffering capabilities for when the sink cannot be reached for
whatever reason.  I couldn't find anything like that. Is that something
that one would have to write for each (new or custom) connector or is that
a part of the framework and I just missed it?

If I missed it, how is buffering implemented?
In-memory or on persistent media or some hybrid?
Can one define the max buffer capacity in terms of either number of items
or bytes?
If so, are the oldest items in the buffer dropped when the max capacity (or
their age) is reached?

Thanks,
Otis
--
Monitoring - Log Management - Alerting - Anomaly Detection
Solr & Elasticsearch Consulting Support Training - http://sematext.com/

Re: Buffering on the edge (network issues)

Posted by Dale LaBossiere <dm...@gmail.com>.
Related: for MQTT Connector see https://issues.apache.org/jira/browse/EDGENT-384 <https://issues.apache.org/jira/browse/EDGENT-384>

Also forgot to mention PlumbingStreams.isolate()

pressureReliever(), isolate(), events(), and Windows all use in-memory buffering.

> On Jan 2, 2018, at 12:38 PM, Dale LaBossiere <dm...@gmail.com> wrote:
> 
> Hi,
> 
> An Edgent stream is very light weight, it doesn’t have any inherent buffering per-se.  The simple model is that the “next tuple” isn’t processed until processing of the current tuple is complete — accepted by all its downstream streams - ultimately by a “sink”.  i.e., downstream streams exert back pressure on overall processing / upstream processing.
> 
> Window streams (used for aggregation) include buffering.
> PlumbingStreams.pressureReliever() [1] can be used to isolate upstream processing from downstream streams (by adding a buffer).
> Use of Topology.events() [2], directly or more typically by a connector, adds a buffer.
> 
> Individual connectors (typically the underlying 3rd party client library, e.g., MQTT, Kafka) influence the behavior based on whether or not they provide any internal buffering.  MQTT, and Edgent’s MQTT connector exposes quality of service and persistence provider controls.
> 
> [1] http://edgent.apache.org/javadoc/latest/org/apache/edgent/topology/plumbing/PlumbingStreams.html#pressureReliever-org.apache.edgent.topology.TStream-org.apache.edgent.function.Function-int- <http://edgent.apache.org/javadoc/latest/org/apache/edgent/topology/plumbing/PlumbingStreams.html#pressureReliever-org.apache.edgent.topology.TStream-org.apache.edgent.function.Function-int->
> [2] http://edgent.apache.org/javadoc/latest/org/apache/edgent/topology/Topology.html#events-org.apache.edgent.function.Consumer- <http://edgent.apache.org/javadoc/latest/org/apache/edgent/topology/Topology.html#events-org.apache.edgent.function.Consumer->
> 
> Hope that helps,
> — Dale
> 
>> On Dec 29, 2017, at 4:41 PM, Otis Gospodnetić <otis.gospodnetic@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I tried looking through the API and examples to see if there is Edgent has
>> any built-in buffering capabilities for when the sink cannot be reached for
>> whatever reason.  I couldn't find anything like that. Is that something
>> that one would have to write for each (new or custom) connector or is that
>> a part of the framework and I just missed it?
>> 
>> If I missed it, how is buffering implemented?
>> In-memory or on persistent media or some hybrid?
>> Can one define the max buffer capacity in terms of either number of items
>> or bytes?
>> If so, are the oldest items in the buffer dropped when the max capacity (or
>> their age) is reached?
>> 
>> Thanks,
>> Otis
>> --
>> Monitoring - Log Management - Alerting - Anomaly Detection
>> Solr & Elasticsearch Consulting Support Training - http://sematext.com/ <http://sematext.com/>
> 


Re: Buffering on the edge (network issues)

Posted by Dale LaBossiere <dm...@gmail.com>.
Hi,

An Edgent stream is very light weight, it doesn’t have any inherent buffering per-se.  The simple model is that the “next tuple” isn’t processed until processing of the current tuple is complete — accepted by all its downstream streams - ultimately by a “sink”.  i.e., downstream streams exert back pressure on overall processing / upstream processing.

Window streams (used for aggregation) include buffering.
PlumbingStreams.pressureReliever() [1] can be used to isolate upstream processing from downstream streams (by adding a buffer).
Use of Topology.events() [2], directly or more typically by a connector, adds a buffer.

Individual connectors (typically the underlying 3rd party client library, e.g., MQTT, Kafka) influence the behavior based on whether or not they provide any internal buffering.  MQTT, and Edgent’s MQTT connector exposes quality of service and persistence provider controls.

[1] http://edgent.apache.org/javadoc/latest/org/apache/edgent/topology/plumbing/PlumbingStreams.html#pressureReliever-org.apache.edgent.topology.TStream-org.apache.edgent.function.Function-int- <http://edgent.apache.org/javadoc/latest/org/apache/edgent/topology/plumbing/PlumbingStreams.html#pressureReliever-org.apache.edgent.topology.TStream-org.apache.edgent.function.Function-int->
[2] http://edgent.apache.org/javadoc/latest/org/apache/edgent/topology/Topology.html#events-org.apache.edgent.function.Consumer- <http://edgent.apache.org/javadoc/latest/org/apache/edgent/topology/Topology.html#events-org.apache.edgent.function.Consumer->

Hope that helps,
— Dale

> On Dec 29, 2017, at 4:41 PM, Otis Gospodnetić <ot...@gmail.com> wrote:
> 
> Hi,
> 
> I tried looking through the API and examples to see if there is Edgent has
> any built-in buffering capabilities for when the sink cannot be reached for
> whatever reason.  I couldn't find anything like that. Is that something
> that one would have to write for each (new or custom) connector or is that
> a part of the framework and I just missed it?
> 
> If I missed it, how is buffering implemented?
> In-memory or on persistent media or some hybrid?
> Can one define the max buffer capacity in terms of either number of items
> or bytes?
> If so, are the oldest items in the buffer dropped when the max capacity (or
> their age) is reached?
> 
> Thanks,
> Otis
> --
> Monitoring - Log Management - Alerting - Anomaly Detection
> Solr & Elasticsearch Consulting Support Training - http://sematext.com/