You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@s2graph.apache.org by "DOYUNG YOON (JIRA)" <ji...@apache.org> on 2017/12/08 09:55:00 UTC

[jira] [Created] (S2GRAPH-173) Unify multiple concurrent library to reactive stream.

DOYUNG YOON created S2GRAPH-173:
-----------------------------------

             Summary: Unify multiple concurrent library to reactive stream.
                 Key: S2GRAPH-173
                 URL: https://issues.apache.org/jira/browse/S2GRAPH-173
             Project: S2Graph
          Issue Type: New Feature
    Affects Versions: 0.3.0
            Reporter: DOYUNG YOON
             Fix For: 0.3.0


S2Graph codebase use multiple asynchronous library currently. 


* AsynchbaseStorage use com.stumbleupon.async library to work with asynchbase.
* S2Graph, and all other classes use scala.concurrent.Future.
* Some method in S2Graph return java.util.concurrent.CompletableFuture.


Using multiple concurrent library is no problem, but It would be nice if we can unify them on Client interface at least. 

Also RocksStorage, which does not provide asynchronous operation on itself, use scala.concurrent.Future currently by just wrapping object into Future.successful, only because storage’s interface is fixed into scala.concurrent.Future.

It would be better if storage’s interface can express both of blocking and non-blocking operations, and think Reactive stream can help us.

Also we can create traversal dsl like tinkerpop’s traversal and step by using reactive stream easily.

Reference: 

* http://tinkerpop.apache.org/docs/current/reference/#graph-traversal-steps
* http://reactivex.io/documentation/observable.html 

Note that below draft is very naive idea and highly possible to be changed on experimenting with reactive stream library.

{noformat}
trait Step[I, O](input: I) {
	def apply: Observable[O]
}

trait MapStep[I, O](input: I) {
	def apply: Observable[O]
}

trait FlatMapStep[I, O](input: I) {
	def apply: Observable[O]
}

trait FilterStep[I](input: I) {
	def apply: Observable[I]
}


class VertexStep(graph: Graph, srcVertex: Vertex) extends Step[Vertex, Edge] {

	def apply: Observable[Edge] = {
		graph.fetchEdges(srcVertex)
	}
}

class EdgeStep(graph: Graph, edge: Edge, dir: Direction) extends Step {
	def apply: Observable[Vertex] = {
		dir match {
			case Direction.OUT => Observable.just(edge.inVertex())
			case Direction.IN => Observable.just(edge.outVertex())
			case Direction.BOTH => Observable.just(edge.inVertex(), edge.outVertex())
		}
	}
}

def toTraversal[O](query: Query): Observer[O] = {
	val observer = new Observer()
	val init = Observable.empty()

	val observable = query.steps.foldLeft(init) { case (prev, step) => 
		step match {
			case MapStep => 
				prev.map { x => 
					step.apply(x)
				}
			case FlatMapStep => 
				prev.flatMap { x => 
					step.apply(x)
				}
			case FilterStep => 
				prev.filter { x => 
					step.apply(x)
				}
		}
	}

	observer.subscribe(observable)
}
{noformat}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)