You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Yogesh Mahajan <ym...@snappydata.io> on 2018/02/13 21:10:30 UTC

Inefficient state management in stream to stream join in 2.3

In 2.3, stream to stream joins(both Inner and Outer) are implemented using
symmetric hash join(SHJ) algorithm, and that is a good choice
and I am sure you had compared with other family of algorithms like XJoin
and non-blocking sort based algorithms like progressive merge join (PMJ
<http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.440.506&rep=rep1&type=pdf>
)

*From functional point of view - *
1. It considers most of the stream to stream join use cases and all the
considerations around event time and watermarks as joins keys are well
thought trough.
2. It also adopts an effective approach towards join state management is to
exploit 'hard' constraints in the input streams to reduce state rather than
exploiting statistical properties as 'soft' constraints.

*From performance point of view - *
Since SHJ assumes that the entire join state can be kept in main memory,
but the StateStore in Spark is backed by the HDFS compatible file system.
Also looking at the code StreamingSymmetricHashJoinExec here
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala>,
two StateStores(KeyToNumValuesStore, KeyWithIndexToValueStore) are used and
multiple lookups to them in each
StreamExecution(MicroBatch/ContinuousExecution)
per partition per operator will have huge performance penalty even for a
moderate size of state of queries like groupBy “SYMBOL”

To overcome this perf hit, even though we implement our own efficient
in-memory StateStore, there is no way to avoid these multiple lookups
unless and until you have your own StreamingSymmetricHashJoinExec
implementation.

We should consider using efficient main-memory data structures described in
this paper
<https://pdfs.semanticscholar.org/2ecc/c55c2076f8feb586d92f01b08094e15b0b4b.pdf>
which are suited for storing sliding windows, with efficient support for
removing tuples that have fallen out of the state.

Other way to reduce unnecessary state using punctuations
<http://www.whitworth.edu/academic/department/mathcomputerscience/faculty/tuckerpeter/pdf/117896_final.pdf>
(in contrast to existing way where constraints have to be known a priori). A
punctuation is a tuple of patterns specifying a predicate that must
evaluate to false for all future data tuples in the stream and these can be
inserted dynamically.

For example consider two streams join, auctionStream and bidStream. When a
particular auction closes, system inserts a punctuation into the bidStream
to signal that there will be no more bids for that particular auction
and purges
those tuples that cannot possibly join with future arrivals. PJoin
<http://davis.wpi.edu/dsrg/Old/pdf/pjoin.pdf> is one example of stream join
algorithm which exploits punctuations.

Thanks,
http://www.snappydata.io/blog <http://snappydata.io>