You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tinkerpop.apache.org by Marko Rodriguez <ok...@gmail.com> on 2019/04/11 23:13:16 UTC

TinkerPop4 Status Report #3

Hello,

I spent most of the last 1.5 weeks working on RxJavaProcessor (ReactiveX — http://reactivex.io/ <http://reactivex.io/>), where 3 of those days were spent in a nasty code hell trying to figure out how to do cyclic stream topologies for repeat(). I’ve never read so much of someone else’s code in my life — I’ve come to know the inner workings of RxJava quite well.

Without further ado, here is what the tp4/ branch is looking like these days:

Machine machine = LocalMachine.open()
TraversalSource g = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class)

From g, you can spawn single-threaded Rx Flowables. Here is the SerialRxJava processor code:

	https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java>
		- So simple. 130 lines of code.

If you do:

TraversalSource g = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class, Map.of(“rx.threadPool.size”,10))

You can spawn multi-threaded Rx ParallelFlowables. Here is the ParallelRxJava processor code:

	https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java>
		- So simple. 150 lines of code.

I now am completely confident that the TP4 CFunction intermediate representation (Compilation.class) is sufficient to support all types of execution engines.

	- Pipes: single-threaded, pull-based.
	- Beam: distributed, push-based.
	- RxJava: multi-threaded, push-based

Implementing BeamProcessor was important to know that multi-machine execution would come naturally and RxJava was important to know that multi-threading would come naturally.  I know that Akka will work just fine as it is both multi-threaded and distributed. Therefore, I believe we have converted on the chain of representational mappings that we will use in the TP4 VM.

	Language ==> Bytecode ==> CFunction Intermediate Representation ==> Processor-Specific Execution Plan

In TP3, we do:

	Language ==> Bytecode ==> Pipes ==> Processor-Specific Execution Plan

…we foolishly embedded one execution engine within another and this has been a cause of various pains that are now rectified in TP4.

——

Ted Wilmes did some preliminary benchmarking of Pipes vs. SerialRxJava vs. ParallelRxJava. Here are his results for two traversals:
	
RxSerialTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
 sample          6.988          ms/op
RxParallelTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
 sample         11.633          ms/op
PipesTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
 sample          6.627          ms/op

RxSerialTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
               sample 3.592          ms/op
RxParallelTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
               sample          7.897          ms/op
PipesTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
               sample          3.887          ms/op

We should expect ParallelRxJava to shine when interacting with a data source where lots of time is wasted on I/O. I’m hoping that ParallelRxJava will be able to transform borderline real-time queries in TP3 into genuine real-time queries in TP4.

——

One of the outstanding problems I’m having (and I have given up on for now) is that I can’t figure out how to do cyclic stream topologies in ReactiveX. Instead, I have to do the repetiion-based implementation of repeat() as defined in Stream Ring Theory.

	https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L108-L127 <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L108-L127>

What I’m doing is creating MAX_REPETITIONS amount of “unrolled loops” with exit streams for emit() and until() breaks. I then merge all those exit streams into the main outgoing stream. What I would like to do is be able to send a traverser back to a previous Operator. I’ve gone through about 5 different implementations, but they each have their problems. If anyone is versed in ReactiveX and can tell me the best way to do looping, I would appreciate it. And yes, I’ve read many a StackOverflows and docs … always close, but never exactly my problem. … also, the solution needs to work for both serial and parallel streams (I had a serial implementation that worked, but I ‘git stashed’ it cause it wasn’t thread-safe.)

——

Finally, I’ve started doing more work with Strategies. You can see our most complex TP4 strategy to date here:

	https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java>

If a traversal is multi-threaded, it will use a single-threaded execution for nested traversals if they are “simple” (no branching, no flatmaps, and not too long). Given that nested traversals are both the simplest and most executed traversals in TP, it is important to make sure they don’t waste resources. Thus:

g.V().has(‘age’,gt(32))  
	// the gt(32) traversal is single threaded
g.V().has(‘age’, gt(out(‘knows’).value(‘age’).max())) 
	// the gt(…) traversal is multi-threaded 

Pretty neat, eh? My rules for determining “simple” are hardcoded as I’m primarily trying to figure out how the strategy interface will look and feel. Strategies are different in TP4 as they operate on bytecode and not on Pipe steps. I’m still not completely happy with the strategy model in TP4 …. its a bit awkward. Still more to learn. As we develop more strategies, I’m sure a pattern will emerge.

——

For the next push, I plan to turn my attention to data structures. I’m happy with our processing infrastructure. I think its theoretically sound and easy to use. With respects to data structures, the question remains — does TP4 go “beyond graph” ? ….. Unfortunately, lately, I’ve been thinking “no” … However, I need to focus some thought on the subject so I can make a confident argument one way or the other.

Thanks for reading. Enjoy your weekend!

Marko.

http://rredux.com <http://rredux.com/>