You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tinkerpop.apache.org by Marko Rodriguez <ok...@gmail.com> on 2019/03/26 18:15:31 UTC

[DISCUSS] TP4 TraverserServer, RemoteMachine, and MachineServer.

Hello,

Today I’m working on the TP4 BeamProcessor package. When you create an Apache Beam pipeline, the final Fn (steam operation) is used to write the output “somewhere.” Beam provides writers for common databases like Cassandra, HBase, RDBMS and for common file formats/systems like HDFS and S3. For testing purposes (and to move forward quickly), I just created a "public static TraverserSet" (which is thread safe) and had the final Fn reference the TraverserSet and write traversers to it. Of course, this doesn’t work outside unit tests… So I needed to up my game. I need a way for all the final Fns to send their traverses to a server.

I decided to build my own multi-threaded TraverserServer (ignore the rough edges for now — error handling, ObjectInputStream, etc.).

	https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java>

I originally had this class in Beam only.

	1. Start the server when the Beam pipeline is started.
		https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java#L92-L94 <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java#L92-L94>
	2. Have the OutputFn connect to the server and send resultant traversers to it.
		https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java#L46 <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java#L46>

Makes sense. Works. Great. Done. … or was I?!?!

Think about this in practice. If TP4 had a “GremlinServer”, we would have to stream the resultant traversers in TraverserServer over the wire to the client. This means an extra serialization/deserialization step. Why do that? Why not move the TraverserServer to the client! That is, the OutputFn-steps send their traversers to a TraverserSet regardless of it being maintained by Beam.

If you read my post yesterday about the Machine interface, you will know that we now have a Machine interface. You do this:

Machine machine = LocalMachine.open()
TraversalSource g = Gremlin.traversal(machine).withProcessor(BeamProcessor.class)

This morning, I thought to myself — “maybe I can just rip out a RemoteMachine??” 4 hours later, done. Here is how you use it:

// assume this code is on 111.111.111.111
MachineServer machineServer = new MachineServer(7777); 

// assume this code is on 222.222.222.222
Machine machine = RemoteMachine.open(6666, “111.111.111.111", 7777);
TraversalSource g = Gremlin.traversal(machine).withProcessor(
	BeamProcessor.class, 
	Map.of("beam.traverserServer.location", “222.222.222.222", "beam.traverserServer.port", 6666))
      
MachineServer is like GremlinServer. It just sits (at the cluster) and waits for connections. It registers bytecode source, gets bytecode submissions, and returns traversers. Note how it is backed by a LocalMachine! Genius.
	https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java>
		- its basic right now and Stephen will know how to make stuff better.
	
RemoteMachine is like RemoteConnection. It communicates with a MachineServer. However, guess what else it can do! It can spawn a TraverserServers to aggregate results in a threaded manner (from multiple traverser producers)!
	https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java>

Now here is the kicker — if you have a processor that parallel processes (Akka, Spark, Flink, etc.), these processors can be provided the coordinates to a TraverserServer (see the withProcess() Map in the above code fragment). This way, they will send their resultant traversers directly to the RemoteMachine (i.e. the client), bypassing double serialization at MachineServer. However, if you don’t want that to happen -- that is, if you want the results to go through the MachineServer, well just don’t specify the location of a TraverserServer and bam, these processors will just send their results to the MachineServer as it has have its own TraverserServer. Then the MachineServer will send traversers to the RemoteMachine via classical serial Iterator<Traverser>-style. This means that Pipes just works! Pipes has no notion of networks/machines/serialization, so MachineServer just iterates out the traversers and sends them to RemoteMachine.

Look — MachineServer always tries to iterate out processor results. However, when the processor is parallelized with outputs doing their own network I/O, the iterator is empty. Makes sense, logical and consistent. However, Pipes won’t be empty. Thus, there is no “special logic” to handle distributed vs. local processors!
	https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java#L95-L105 <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java#L95-L105>

Thus, this just works out of the box!

Machine machine = RemoteMachine.open(6666, “111.111.111.111", 7777);
TraversalSource g = Gremlin.traversal(machine).withProcessor(PipesProcessor.class)

And finally coming full circle — this just works (no special logic required):

Machine machine = LocalMachine.open()
TraversalSource g = Gremlin.traversal(machine).withProcessor(BeamProcessor.class)	

Boo yea!

Thoughts?,
Marko.

http://rredux.com <http://rredux.com/>





Re: [DISCUSS] TP4 TraverserServer, RemoteMachine, and MachineServer.

Posted by Marko Rodriguez <ok...@gmail.com>.
Hi,

I drew a graphic so you can understand better what I’m talking about.

The first picture is for non-distributed processors or when you want the distributed processor to send its results to MachineServer. [TP3 GremlinServer-style]

The second picture is for distributed processors where you bypass MachineServer and have the client directly handle distributed ingestion of traverser results.

Enjoy!,
Marko.

http://rredux.com <http://rredux.com/>





> On Mar 26, 2019, at 12:15 PM, Marko Rodriguez <ok...@gmail.com> wrote:
> 
> Hello,
> 
> Today I’m working on the TP4 BeamProcessor package. When you create an Apache Beam pipeline, the final Fn (steam operation) is used to write the output “somewhere.” Beam provides writers for common databases like Cassandra, HBase, RDBMS and for common file formats/systems like HDFS and S3. For testing purposes (and to move forward quickly), I just created a "public static TraverserSet" (which is thread safe) and had the final Fn reference the TraverserSet and write traversers to it. Of course, this doesn’t work outside unit tests… So I needed to up my game. I need a way for all the final Fns to send their traverses to a server.
> 
> I decided to build my own multi-threaded TraverserServer (ignore the rough edges for now — error handling, ObjectInputStream, etc.).
> 
> 	https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java>
> 
> I originally had this class in Beam only.
> 
> 	1. Start the server when the Beam pipeline is started.
> 		https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java#L92-L94 <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java#L92-L94>
> 	2. Have the OutputFn connect to the server and send resultant traversers to it.
> 		https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java#L46 <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java#L46>
> 
> Makes sense. Works. Great. Done. … or was I?!?!
> 
> Think about this in practice. If TP4 had a “GremlinServer”, we would have to stream the resultant traversers in TraverserServer over the wire to the client. This means an extra serialization/deserialization step. Why do that? Why not move the TraverserServer to the client! That is, the OutputFn-steps send their traversers to a TraverserSet regardless of it being maintained by Beam.
> 
> If you read my post yesterday about the Machine interface, you will know that we now have a Machine interface. You do this:
> 
> Machine machine = LocalMachine.open()
> TraversalSource g = Gremlin.traversal(machine).withProcessor(BeamProcessor.class)
> 
> This morning, I thought to myself — “maybe I can just rip out a RemoteMachine??” 4 hours later, done. Here is how you use it:
> 
> // assume this code is on 111.111.111.111
> MachineServer machineServer = new MachineServer(7777); 
> 
> // assume this code is on 222.222.222.222
> Machine machine = RemoteMachine.open(6666, “111.111.111.111", 7777);
> TraversalSource g = Gremlin.traversal(machine).withProcessor(
> 	BeamProcessor.class, 
> 	Map.of("beam.traverserServer.location", “222.222.222.222", "beam.traverserServer.port", 6666))
>       
> MachineServer is like GremlinServer. It just sits (at the cluster) and waits for connections. It registers bytecode source, gets bytecode submissions, and returns traversers. Note how it is backed by a LocalMachine! Genius.
> 	https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java>
> 		- its basic right now and Stephen will know how to make stuff better.
> 	
> RemoteMachine is like RemoteConnection. It communicates with a MachineServer. However, guess what else it can do! It can spawn a TraverserServers to aggregate results in a threaded manner (from multiple traverser producers)!
> 	https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java>
> 
> Now here is the kicker — if you have a processor that parallel processes (Akka, Spark, Flink, etc.), these processors can be provided the coordinates to a TraverserServer (see the withProcess() Map in the above code fragment). This way, they will send their resultant traversers directly to the RemoteMachine (i.e. the client), bypassing double serialization at MachineServer. However, if you don’t want that to happen -- that is, if you want the results to go through the MachineServer, well just don’t specify the location of a TraverserServer and bam, these processors will just send their results to the MachineServer as it has have its own TraverserServer. Then the MachineServer will send traversers to the RemoteMachine via classical serial Iterator<Traverser>-style. This means that Pipes just works! Pipes has no notion of networks/machines/serialization, so MachineServer just iterates out the traversers and sends them to RemoteMachine.
> 
> Look — MachineServer always tries to iterate out processor results. However, when the processor is parallelized with outputs doing their own network I/O, the iterator is empty. Makes sense, logical and consistent. However, Pipes won’t be empty. Thus, there is no “special logic” to handle distributed vs. local processors!
> 	https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java#L95-L105 <https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java#L95-L105>
> 
> Thus, this just works out of the box!
> 
> Machine machine = RemoteMachine.open(6666, “111.111.111.111", 7777);
> TraversalSource g = Gremlin.traversal(machine).withProcessor(PipesProcessor.class)
> 
> And finally coming full circle — this just works (no special logic required):
> 
> Machine machine = LocalMachine.open()
> TraversalSource g = Gremlin.traversal(machine).withProcessor(BeamProcessor.class)	
> 
> Boo yea!
> 
> Thoughts?,
> Marko.
> 
> http://rredux.com <http://rredux.com/>
> 
> 
> 
>