You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Tech Id <te...@gmail.com> on 2017/05/14 20:49:04 UTC

Confusing documentation for Trident State Management

Hi,

We have been using Storm for more than an year now and generally very happy
with the product.
However I want to bring up a concern for the documentation of Trident.
Me and a few colleagues went over the documentation at
http://storm.apache.org/releases/current/Trident-state.html
But it is still unclear how the micro-batching achieves its goal.

Consider this statement for instance:
"*Suppose your topology computes word count and you want to store the word
counts in a key/value database ... what you can do is store the transaction
id with the count in the database as an atomic value. *"


*Question:*
The key/value database is the target of Trident topology or something
internal to Trident?
- If it is the target of the topology, how can we change the system from
being a key-value store to a key-value-txid store? It seems that we are
making the target itself as idempotent and not really providing
exactly-once semantics to Trident itself in a generic way because target's
idempotency is specific to the target and should not be considered an
exactly-once property of streaming.
- If the key-value store is something internal to Trident, then how are we
achieving exactly-once for the actual target? It seems that the actual
target and the key-value store are two different systems with no
distributed transaction. Clearly, one system can fail and the other can
succeed, thus loosing the exactly-once promise.


*Another Question:*
It seems that a micro-batch will be continuously replayed even if just one
word in that batch failed to update its count in the key-value store. If
that is true, the next batch will not even be sent to the target system
because that next batch might have the same word and since its previous
batch was not successful due to the same failing word, this next batch
would also fail. (Rule 3: "*State updates are ordered among batches. That
is, the state updates for batch 3 won't be applied until the state updates
for batch 2 have succeeded.*").
This means that the whole topology will be stuck at the failing batch.
This is much worse than single-tuple at-least-once delivery because a batch
may be having thousands to millions of messages and all of them are now
stuck with the topology coming to a stand-still. If the problem was
temporary, exactly-once-delivery will suffer a giant hit on latency as
compared to its atleast-once-delivery sibling.
If this is true, then this fact should be added to the documentation so
that users know beforehand what they are signing up for.


*A side note:*
This statement seems to be very old and completely untrue as of now:
"*One side note – once Kafka supports replication, it will be possible to
have transactional spouts that are fault-tolerant to node failure, but that
feature does not exist yet*"
Kafka does support replication and we should consider fixing the docs to
reflect the same.


Thanks
TI

Re: Confusing documentation for Trident State Management

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
First of all I am not an expert on trident.  I understand it at a high level, but I have not done too much with the internals of it.  And yes we are all in agreement that the documentation for trident is quite bad.  It was something that Nathan finished just before he left Twitter so a lot of the deep knowledge about it is still with him.
In all distributed systems there really is no way to get truly exactly once processing.  All processing needs to be idempotent with retry.  Spark streaming, beam, trident, flink, all offer some variant of this.  they just differ in how far you would have to roll back in your processing.  The recently announced transactions in spark streaming can be thought of as the same thing but with an interesting twist because the input and the output are both going to the same system so they can atomically commit the the output and update the pointer for the input at the same time. 

The way trident does it is by dividing the input into a micro batch.  The guarantee is that the batches will be committed to some external system in order.  They may be processed out of order, but will be committed in order.  If one batch fails that batch will be replayed and all other outstanding batches will not be committed until the previous batch has completed.
This is why writing a result to an external system in storm requires you to write to a State
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/trident/state/State.java
All state defines is how to commit data to it.  There is nothing about how that data is stored or what that data is.  Typically the data will be stored in a key value store but it could be anything.
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/trident/state/map/MapState.java
How you implement those guarantees in the state is up to you.  If could be through transactions to a database, it could be through adding the transaction id to the key.

https://github.com/apache/storm/blob/master/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
HBase ignores the commits and assumes all of the operations are already idempotent.

https://github.com/apache/storm/blob/master/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
Hdfs does use the commits to rotate files and can then recover from replayed data if need be.

You might also want to look at OpaqueMap
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/trident/state/map/OpaqueMap.java
to see how it tries to add the transaction id to different backing maps so that is can make the operations idempotent.


- Bobby

On Sunday, May 14, 2017, 3:49:11 PM CDT, Tech Id <te...@gmail.com> wrote:Hi,

We have been using Storm for more than an year now and generally very happy
with the product.
However I want to bring up a concern for the documentation of Trident.
Me and a few colleagues went over the documentation at
http://storm.apache.org/releases/current/Trident-state.html
But it is still unclear how the micro-batching achieves its goal.

Consider this statement for instance:
"*Suppose your topology computes word count and you want to store the word
counts in a key/value database ... what you can do is store the transaction
id with the count in the database as an atomic value. *"


*Question:*
The key/value database is the target of Trident topology or something
internal to Trident?
- If it is the target of the topology, how can we change the system from
being a key-value store to a key-value-txid store? It seems that we are
making the target itself as idempotent and not really providing
exactly-once semantics to Trident itself in a generic way because target's
idempotency is specific to the target and should not be considered an
exactly-once property of streaming.
- If the key-value store is something internal to Trident, then how are we
achieving exactly-once for the actual target? It seems that the actual
target and the key-value store are two different systems with no
distributed transaction. Clearly, one system can fail and the other can
succeed, thus loosing the exactly-once promise.


*Another Question:*
It seems that a micro-batch will be continuously replayed even if just one
word in that batch failed to update its count in the key-value store. If
that is true, the next batch will not even be sent to the target system
because that next batch might have the same word and since its previous
batch was not successful due to the same failing word, this next batch
would also fail. (Rule 3: "*State updates are ordered among batches. That
is, the state updates for batch 3 won't be applied until the state updates
for batch 2 have succeeded.*").
This means that the whole topology will be stuck at the failing batch.
This is much worse than single-tuple at-least-once delivery because a batch
may be having thousands to millions of messages and all of them are now
stuck with the topology coming to a stand-still. If the problem was
temporary, exactly-once-delivery will suffer a giant hit on latency as
compared to its atleast-once-delivery sibling.
If this is true, then this fact should be added to the documentation so
that users know beforehand what they are signing up for.


*A side note:*
This statement seems to be very old and completely untrue as of now:
"*One side note – once Kafka supports replication, it will be possible to
have transactional spouts that are fault-tolerant to node failure, but that
feature does not exist yet*"
Kafka does support replication and we should consider fixing the docs to
reflect the same.


Thanks
TI