You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/03 18:50:01 UTC

[tinkerpop] branch tp4 updated: first stub at RxJavaProcessor. everything works except for repeat(). I'm all confused and lost. Need to think.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 414211a  first stub at RxJavaProcessor. everything works except for repeat(). I'm all confused and lost. Need to think.
414211a is described below

commit 414211abde8d2f5f257b67f50ff4b192655db104
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Wed Apr 3 12:49:50 2019 -0600

    first stub at RxJavaProcessor. everything works except for repeat(). I'm all confused and lost. Need to think.
---
 .../machine/function/branch/BranchBranch.java      |   3 +-
 java/machine/processor/pom.xml                     |   1 +
 java/machine/processor/rxjava/pom.xml              |  71 ++++++++++++
 .../machine/processor/rxjava/BranchFlow.java       |  51 +++++++++
 .../machine/processor/rxjava/FilterFlow.java       |  40 +++++++
 .../machine/processor/rxjava/FlatMapFlow.java      |  40 +++++++
 .../machine/processor/rxjava/MapFlow.java          |  40 +++++++
 .../machine/processor/rxjava/ReduceFlow.java       |  41 +++++++
 .../tinkerpop/machine/processor/rxjava/RxJava.java |  84 +++++++++++++++
 .../machine/processor/rxjava/RxJavaProcessor.java  |  42 ++++++++
 .../processor/rxjava/strategy/RxJavaStrategy.java  |  38 +++++++
 .../processor/rxjava/util/TopologyUtil.java        | 120 +++++++++++++++++++++
 .../machine/processor/rxjava/RxJavaTest.java       |  68 ++++++++++++
 13 files changed, 638 insertions(+), 1 deletion(-)

diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
index 13f182c..165a87d 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.machine.function.BranchFunction;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -68,7 +69,7 @@ public final class BranchBranch<C, S, E> extends AbstractFunction<C> implements
 
     public static <C, S, E> BranchBranch<C, S, E> compile(final Instruction<C> instruction) {
         final Object[] args = instruction.args();
-        final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches = new HashMap<>();
+        final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches = new LinkedHashMap<>();
         for (int i = 0; i < args.length; i = i + 2) {
             final Compilation<C, S, ?> predicate = Symbols.DEFAULT.equals(args[i]) ? null : Compilation.compile(args[i]);
             final Compilation<C, S, E> branch = Compilation.compile(args[i + 1]);
diff --git a/java/machine/processor/pom.xml b/java/machine/processor/pom.xml
index f23513c..6a34820 100644
--- a/java/machine/processor/pom.xml
+++ b/java/machine/processor/pom.xml
@@ -27,5 +27,6 @@ limitations under the License.
     <modules>
         <module>pipes</module>
         <module>beam</module>
+        <module>rxjava</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/pom.xml b/java/machine/processor/rxjava/pom.xml
new file mode 100644
index 0000000..d41d44f
--- /dev/null
+++ b/java/machine/processor/rxjava/pom.xml
@@ -0,0 +1,71 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>processor</artifactId>
+        <groupId>org.apache.tinkerpop</groupId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+    <name>Apache TinkerPop :: Machine :: RxJava</name>
+    <artifactId>rxjava</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>machine-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.reactivex.rxjava2</groupId>
+            <artifactId>rxjava</artifactId>
+            <version>2.2.8</version>
+        </dependency>
+        <!-- TEST -->
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>gremlin</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>blueprints</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <directory>${basedir}/target</directory>
+        <finalName>${project.artifactId}-${project.version}</finalName>
+        <testResources>
+            <testResource>
+                <directory>${basedir}/src/test/resources
+                </directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java
new file mode 100644
index 0000000..5b0d7b7
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Function;
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.function.BranchFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BranchFlow<C, S> implements Function<Traverser<C, S>, List> {
+
+    private final List<Compilation<C, S, ?>> selectors;
+
+    public BranchFlow(final BranchFunction<C, S, ?> function) {
+        this.selectors = new ArrayList<>(function.getBranches().keySet());
+    }
+
+    @Override
+    public List apply(final Traverser<C, S> traverser) {
+        for (int i = 0; i < this.selectors.size(); i++) {
+            final Compilation<C, S, ?> selector = this.selectors.get(i);
+            if (null != selector) {
+                if (selector.filterTraverser(traverser))
+                    return List.of(i, traverser);
+            }
+        }
+        return List.of(-1, traverser);
+    }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
new file mode 100644
index 0000000..84eaf68
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Predicate;
+import org.apache.tinkerpop.machine.function.FilterFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class FilterFlow<C, S> implements Predicate<Traverser<C, S>> {
+
+    private final FilterFunction<C, S> function;
+
+    public FilterFlow(final FilterFunction<C, S> function) {
+        this.function = function;
+    }
+
+    @Override
+    public boolean test(final Traverser<C, S> traverser) {
+        return this.function.test(traverser); // todo: make this 0/1-flatmap so traverser splitting is correct
+    }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
new file mode 100644
index 0000000..9000a84
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Function;
+import org.apache.tinkerpop.machine.function.FlatMapFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class FlatMapFlow<C, S, E> implements Function<Traverser<C, S>, Iterable<Traverser<C, E>>> {
+
+    private FlatMapFunction<C, S, E> function;
+
+    public FlatMapFlow(final FlatMapFunction<C, S, E> function) {
+        this.function = function;
+    }
+
+    @Override
+    public Iterable<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
+        return () -> traverser.flatMap(this.function);
+    }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
new file mode 100644
index 0000000..e09eeae
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Function;
+import org.apache.tinkerpop.machine.function.MapFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MapFlow<C, S, E> implements Function<Traverser<C, S>, Traverser<C, E>> {
+
+    private final MapFunction<C, S, E> function;
+
+    public MapFlow(final MapFunction<C, S, E> function) {
+        this.function = function;
+    }
+
+    @Override
+    public Traverser<C, E> apply(final Traverser<C, S> traverser) {
+        return traverser.map(this.function);
+    }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ReduceFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ReduceFlow.java
new file mode 100644
index 0000000..d26c3d5
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ReduceFlow.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.BiFunction;
+import org.apache.tinkerpop.machine.function.ReduceFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ReduceFlow<C, S, E> implements BiFunction<Traverser<C, E>, Traverser<C, S>, Traverser<C, E>> {
+
+    private final ReduceFunction<C, S, E> function;
+
+    public ReduceFlow(final ReduceFunction<C, S, E> function) {
+        this.function = function;
+    }
+
+
+    @Override
+    public Traverser<C, E> apply(final Traverser<C, E> seed, final Traverser<C, S> traverser) {
+        return seed.split(this.function.apply(traverser, seed.object())); // todo: need to think about this re-wrap of the seed
+    }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
new file mode 100644
index 0000000..cd81390
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.Flowable;
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.processor.Processor;
+import org.apache.tinkerpop.machine.processor.rxjava.util.TopologyUtil;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.traverser.TraverserSet;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RxJava<C, S, E> implements Processor<C, S, E> {
+
+    private final AtomicBoolean alive = new AtomicBoolean(Boolean.TRUE);
+    private boolean executed = false;
+    private final TraverserSet<C, S> starts = new TraverserSet<>();
+    private final TraverserSet<C, E> ends = new TraverserSet<>();
+    private final Compilation<C, S, E> compilation;
+
+    public RxJava(final Compilation<C, S, E> compilation) {
+        this.compilation = compilation;
+    }
+
+    @Override
+    public void addStart(final Traverser<C, S> traverser) {
+        this.starts.add(traverser);
+    }
+
+    @Override
+    public Traverser<C, E> next() {
+        this.prepareFlow();
+        return this.ends.remove();
+    }
+
+    @Override
+    public boolean hasNext() {
+        this.prepareFlow();
+        return !this.ends.isEmpty();
+    }
+
+    @Override
+    public void reset() {
+        this.starts.clear();
+        this.ends.clear();
+        this.executed = false;
+    }
+
+    private void prepareFlow() {
+        if (!this.executed) {
+            this.executed = true;
+            TopologyUtil.compile(Flowable.fromIterable(this.starts), compilation).
+                    doOnNext(this.ends::add).
+                    doOnComplete(() -> this.alive.set(Boolean.FALSE)).
+                    subscribe();
+        }
+        if (!this.ends.isEmpty())
+            return;
+        while (this.alive.get()) {
+            if (!this.ends.isEmpty())
+                return;
+        }
+    }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
new file mode 100644
index 0000000..a57954e
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.processor.Processor;
+import org.apache.tinkerpop.machine.processor.ProcessorFactory;
+import org.apache.tinkerpop.machine.processor.rxjava.strategy.RxJavaStrategy;
+import org.apache.tinkerpop.machine.strategy.Strategy;
+
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RxJavaProcessor implements ProcessorFactory {
+    @Override
+    public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> compilation) {
+        return new RxJava<>(compilation);
+    }
+
+    @Override
+    public Set<Strategy<?>> getStrategies() {
+        return Set.of(new RxJavaStrategy());
+    }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
new file mode 100644
index 0000000..85d3290
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava.strategy;
+
+import org.apache.tinkerpop.machine.bytecode.Bytecode;
+import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
+import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
+import org.apache.tinkerpop.machine.processor.rxjava.RxJavaProcessor;
+import org.apache.tinkerpop.machine.strategy.AbstractStrategy;
+import org.apache.tinkerpop.machine.strategy.Strategy;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStrategy> implements Strategy.ProviderStrategy {
+    @Override
+    public <C> void apply(final Bytecode<C> bytecode) {
+        if (!BytecodeUtil.hasSourceInstruction(bytecode, CoreCompiler.Symbols.WITH_PROCESSOR)) {
+            bytecode.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class);
+        }
+    }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
new file mode 100644
index 0000000..b6b0e4a
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava.util;
+
+import io.reactivex.Flowable;
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.function.BranchFunction;
+import org.apache.tinkerpop.machine.function.CFunction;
+import org.apache.tinkerpop.machine.function.FilterFunction;
+import org.apache.tinkerpop.machine.function.FlatMapFunction;
+import org.apache.tinkerpop.machine.function.InitialFunction;
+import org.apache.tinkerpop.machine.function.MapFunction;
+import org.apache.tinkerpop.machine.function.ReduceFunction;
+import org.apache.tinkerpop.machine.processor.rxjava.BranchFlow;
+import org.apache.tinkerpop.machine.processor.rxjava.FilterFlow;
+import org.apache.tinkerpop.machine.processor.rxjava.FlatMapFlow;
+import org.apache.tinkerpop.machine.processor.rxjava.MapFlow;
+import org.apache.tinkerpop.machine.processor.rxjava.ReduceFlow;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.traverser.TraverserFactory;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
+import org.reactivestreams.Publisher;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TopologyUtil {
+
+    public static <C, S, E> Flowable<Traverser<C, E>> compile(final Flowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) {
+        final TraverserFactory<C> traverserFactory = compilation.getTraverserFactory();
+        Flowable<Traverser<C, E>> sink = (Flowable) source;
+        for (final CFunction<C> function : compilation.getFunctions()) {
+            sink = TopologyUtil.extend(sink, function, traverserFactory);
+        }
+        return sink;
+    }
+
+    /*
+     private final void stageInput() {
+        if (this.hasStartPredicates) {
+            final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove();
+            if (1 == this.untilLocation) {
+                if (this.untilCompilation.filterTraverser(traverser)) {
+                    this.outputTraversers.add(traverser);
+                } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser)) {
+                    this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
+                    this.repeat.addTraverser(traverser);
+                } else
+                    this.repeat.addTraverser(traverser);
+            } else if (1 == this.emitLocation) {
+                if (this.emitCompilation.filterTraverser(traverser))
+                    this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
+                if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser))
+                    this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
+                else
+                    this.repeat.addTraverser(traverser);
+            }
+        } else {
+            this.repeat.addTraverser(this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove());
+        }
+    }
+
+
+     */
+
+    private static <C, S, E, B> Flowable<Traverser<C, E>> extend(Flowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) {
+        if (function instanceof MapFunction)
+            return flow.map(new MapFlow<>((MapFunction<C, S, E>) function));
+        else if (function instanceof FilterFunction) {
+            return (Flowable) flow.filter(new FilterFlow<>((FilterFunction<C, S>) function));
+        } else if (function instanceof FlatMapFunction) {
+            return flow.flatMapIterable(new FlatMapFlow<>((FlatMapFunction<C, S, E>) function));
+        } else if (function instanceof InitialFunction) {
+            return Flowable.fromIterable(() -> IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> traverserFactory.create(function, s)));
+        } else if (function instanceof ReduceFunction) {
+            final ReduceFunction<C, S, E> reduceFunction = (ReduceFunction<C, S, E>) function;
+            return flow.reduce(traverserFactory.create(reduceFunction, reduceFunction.getInitialValue()), new ReduceFlow<>(reduceFunction)).toFlowable();
+        } else if (function instanceof BranchFunction) {
+            final Flowable<List> selectorFlow = flow.map(new BranchFlow<>((BranchFunction<C, S, B>) function));
+            final List<Publisher<Traverser<C, E>>> branchFlows = new ArrayList<>();
+            for (final Map.Entry<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches : ((BranchFunction<C, S, E>) function).getBranches().entrySet()) {
+                for (int i = 0; i < branches.getValue().size(); i++) {
+                    final Compilation<C, S, E> branch = branches.getValue().get(i);
+                    final int id = i;
+                    branchFlows.add(
+                            selectorFlow.
+                                    filter(list -> list.get(0).equals(null == branches.getKey() ? -1 : id)).
+                                    map(list -> (Traverser<C, S>) list.get(1)).
+                                    publish(f -> compile(f, branch)));
+                }
+            }
+            Flowable<Traverser<C, E>> sink = (Flowable) flow.filter(t -> false); // branches are the only outputs
+            for (final Publisher<Traverser<C, E>> branchFlow : branchFlows) {
+                sink = sink.mergeWith(branchFlow);
+            }
+            return sink;
+        }
+        throw new RuntimeException("Need a new execution plan step: " + function);
+    }
+}
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
new file mode 100644
index 0000000..8085bbc
--- /dev/null
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import org.apache.tinkerpop.language.gremlin.Gremlin;
+import org.apache.tinkerpop.language.gremlin.P;
+import org.apache.tinkerpop.language.gremlin.Traversal;
+import org.apache.tinkerpop.language.gremlin.TraversalSource;
+import org.apache.tinkerpop.language.gremlin.TraversalUtil;
+import org.apache.tinkerpop.language.gremlin.common.__;
+import org.apache.tinkerpop.machine.Machine;
+import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
+import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.tinkerpop.language.gremlin.common.__.incr;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class RxJavaTest {
+
+    @Test
+    public void doStuff() {
+        final Machine machine = LocalMachine.open();
+        final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
+                .withCoefficient(LongCoefficient.class)
+                .withProcessor(RxJavaProcessor.class)
+                .withStrategy(IdentityStrategy.class);
+
+        Traversal<Long, ?, ?> traversal = g.inject(2L).is(P.gt(1)).union(incr(),__.<Long>incr().incr());
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(1L).choose(__.is(1L), incr(),__.<Long>incr().incr());
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+
+        /*traversal = g.inject(1L).until(__.is(P.lt(3L))).emit().repeat(incr());
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");*/
+    }
+}