You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/03 18:50:01 UTC
[tinkerpop] branch tp4 updated: first stub at RxJavaProcessor.
everything works except for repeat(). I'm all confused and lost. Need to
think.
This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new 414211a first stub at RxJavaProcessor. everything works except for repeat(). I'm all confused and lost. Need to think.
414211a is described below
commit 414211abde8d2f5f257b67f50ff4b192655db104
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Wed Apr 3 12:49:50 2019 -0600
first stub at RxJavaProcessor. everything works except for repeat(). I'm all confused and lost. Need to think.
---
.../machine/function/branch/BranchBranch.java | 3 +-
java/machine/processor/pom.xml | 1 +
java/machine/processor/rxjava/pom.xml | 71 ++++++++++++
.../machine/processor/rxjava/BranchFlow.java | 51 +++++++++
.../machine/processor/rxjava/FilterFlow.java | 40 +++++++
.../machine/processor/rxjava/FlatMapFlow.java | 40 +++++++
.../machine/processor/rxjava/MapFlow.java | 40 +++++++
.../machine/processor/rxjava/ReduceFlow.java | 41 +++++++
.../tinkerpop/machine/processor/rxjava/RxJava.java | 84 +++++++++++++++
.../machine/processor/rxjava/RxJavaProcessor.java | 42 ++++++++
.../processor/rxjava/strategy/RxJavaStrategy.java | 38 +++++++
.../processor/rxjava/util/TopologyUtil.java | 120 +++++++++++++++++++++
.../machine/processor/rxjava/RxJavaTest.java | 68 ++++++++++++
13 files changed, 638 insertions(+), 1 deletion(-)
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
index 13f182c..165a87d 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.machine.function.BranchFunction;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -68,7 +69,7 @@ public final class BranchBranch<C, S, E> extends AbstractFunction<C> implements
public static <C, S, E> BranchBranch<C, S, E> compile(final Instruction<C> instruction) {
final Object[] args = instruction.args();
- final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches = new HashMap<>();
+ final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches = new LinkedHashMap<>();
for (int i = 0; i < args.length; i = i + 2) {
final Compilation<C, S, ?> predicate = Symbols.DEFAULT.equals(args[i]) ? null : Compilation.compile(args[i]);
final Compilation<C, S, E> branch = Compilation.compile(args[i + 1]);
diff --git a/java/machine/processor/pom.xml b/java/machine/processor/pom.xml
index f23513c..6a34820 100644
--- a/java/machine/processor/pom.xml
+++ b/java/machine/processor/pom.xml
@@ -27,5 +27,6 @@ limitations under the License.
<modules>
<module>pipes</module>
<module>beam</module>
+ <module>rxjava</module>
</modules>
</project>
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/pom.xml b/java/machine/processor/rxjava/pom.xml
new file mode 100644
index 0000000..d41d44f
--- /dev/null
+++ b/java/machine/processor/rxjava/pom.xml
@@ -0,0 +1,71 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>processor</artifactId>
+ <groupId>org.apache.tinkerpop</groupId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+ <name>Apache TinkerPop :: Machine :: RxJava</name>
+ <artifactId>rxjava</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>machine-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>2.2.8</version>
+ </dependency>
+ <!-- TEST -->
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>gremlin</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>blueprints</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <directory>${basedir}/target</directory>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <testResources>
+ <testResource>
+ <directory>${basedir}/src/test/resources
+ </directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java
new file mode 100644
index 0000000..5b0d7b7
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Function;
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.function.BranchFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BranchFlow<C, S> implements Function<Traverser<C, S>, List> {
+
+ private final List<Compilation<C, S, ?>> selectors;
+
+ public BranchFlow(final BranchFunction<C, S, ?> function) {
+ this.selectors = new ArrayList<>(function.getBranches().keySet());
+ }
+
+ @Override
+ public List apply(final Traverser<C, S> traverser) {
+ for (int i = 0; i < this.selectors.size(); i++) {
+ final Compilation<C, S, ?> selector = this.selectors.get(i);
+ if (null != selector) {
+ if (selector.filterTraverser(traverser))
+ return List.of(i, traverser);
+ }
+ }
+ return List.of(-1, traverser);
+ }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
new file mode 100644
index 0000000..84eaf68
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Predicate;
+import org.apache.tinkerpop.machine.function.FilterFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class FilterFlow<C, S> implements Predicate<Traverser<C, S>> {
+
+ private final FilterFunction<C, S> function;
+
+ public FilterFlow(final FilterFunction<C, S> function) {
+ this.function = function;
+ }
+
+ @Override
+ public boolean test(final Traverser<C, S> traverser) {
+ return this.function.test(traverser); // todo: make this 0/1-flatmap so traverser splitting is correct
+ }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
new file mode 100644
index 0000000..9000a84
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Function;
+import org.apache.tinkerpop.machine.function.FlatMapFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class FlatMapFlow<C, S, E> implements Function<Traverser<C, S>, Iterable<Traverser<C, E>>> {
+
+ private FlatMapFunction<C, S, E> function;
+
+ public FlatMapFlow(final FlatMapFunction<C, S, E> function) {
+ this.function = function;
+ }
+
+ @Override
+ public Iterable<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
+ return () -> traverser.flatMap(this.function);
+ }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
new file mode 100644
index 0000000..e09eeae
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Function;
+import org.apache.tinkerpop.machine.function.MapFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MapFlow<C, S, E> implements Function<Traverser<C, S>, Traverser<C, E>> {
+
+ private final MapFunction<C, S, E> function;
+
+ public MapFlow(final MapFunction<C, S, E> function) {
+ this.function = function;
+ }
+
+ @Override
+ public Traverser<C, E> apply(final Traverser<C, S> traverser) {
+ return traverser.map(this.function);
+ }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ReduceFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ReduceFlow.java
new file mode 100644
index 0000000..d26c3d5
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ReduceFlow.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.BiFunction;
+import org.apache.tinkerpop.machine.function.ReduceFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ReduceFlow<C, S, E> implements BiFunction<Traverser<C, E>, Traverser<C, S>, Traverser<C, E>> {
+
+ private final ReduceFunction<C, S, E> function;
+
+ public ReduceFlow(final ReduceFunction<C, S, E> function) {
+ this.function = function;
+ }
+
+
+ @Override
+ public Traverser<C, E> apply(final Traverser<C, E> seed, final Traverser<C, S> traverser) {
+ return seed.split(this.function.apply(traverser, seed.object())); // todo: need to think about this re-wrap of the seed
+ }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
new file mode 100644
index 0000000..cd81390
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.Flowable;
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.processor.Processor;
+import org.apache.tinkerpop.machine.processor.rxjava.util.TopologyUtil;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.traverser.TraverserSet;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RxJava<C, S, E> implements Processor<C, S, E> {
+
+ private final AtomicBoolean alive = new AtomicBoolean(Boolean.TRUE);
+ private boolean executed = false;
+ private final TraverserSet<C, S> starts = new TraverserSet<>();
+ private final TraverserSet<C, E> ends = new TraverserSet<>();
+ private final Compilation<C, S, E> compilation;
+
+ public RxJava(final Compilation<C, S, E> compilation) {
+ this.compilation = compilation;
+ }
+
+ @Override
+ public void addStart(final Traverser<C, S> traverser) {
+ this.starts.add(traverser);
+ }
+
+ @Override
+ public Traverser<C, E> next() {
+ this.prepareFlow();
+ return this.ends.remove();
+ }
+
+ @Override
+ public boolean hasNext() {
+ this.prepareFlow();
+ return !this.ends.isEmpty();
+ }
+
+ @Override
+ public void reset() {
+ this.starts.clear();
+ this.ends.clear();
+ this.executed = false;
+ }
+
+ private void prepareFlow() {
+ if (!this.executed) {
+ this.executed = true;
+ TopologyUtil.compile(Flowable.fromIterable(this.starts), compilation).
+ doOnNext(this.ends::add).
+ doOnComplete(() -> this.alive.set(Boolean.FALSE)).
+ subscribe();
+ }
+ if (!this.ends.isEmpty())
+ return;
+ while (this.alive.get()) {
+ if (!this.ends.isEmpty())
+ return;
+ }
+ }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
new file mode 100644
index 0000000..a57954e
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.processor.Processor;
+import org.apache.tinkerpop.machine.processor.ProcessorFactory;
+import org.apache.tinkerpop.machine.processor.rxjava.strategy.RxJavaStrategy;
+import org.apache.tinkerpop.machine.strategy.Strategy;
+
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RxJavaProcessor implements ProcessorFactory {
+ @Override
+ public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> compilation) {
+ return new RxJava<>(compilation);
+ }
+
+ @Override
+ public Set<Strategy<?>> getStrategies() {
+ return Set.of(new RxJavaStrategy());
+ }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
new file mode 100644
index 0000000..85d3290
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava.strategy;
+
+import org.apache.tinkerpop.machine.bytecode.Bytecode;
+import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
+import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
+import org.apache.tinkerpop.machine.processor.rxjava.RxJavaProcessor;
+import org.apache.tinkerpop.machine.strategy.AbstractStrategy;
+import org.apache.tinkerpop.machine.strategy.Strategy;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStrategy> implements Strategy.ProviderStrategy {
+ @Override
+ public <C> void apply(final Bytecode<C> bytecode) {
+ if (!BytecodeUtil.hasSourceInstruction(bytecode, CoreCompiler.Symbols.WITH_PROCESSOR)) {
+ bytecode.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class);
+ }
+ }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
new file mode 100644
index 0000000..b6b0e4a
--- /dev/null
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava.util;
+
+import io.reactivex.Flowable;
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.function.BranchFunction;
+import org.apache.tinkerpop.machine.function.CFunction;
+import org.apache.tinkerpop.machine.function.FilterFunction;
+import org.apache.tinkerpop.machine.function.FlatMapFunction;
+import org.apache.tinkerpop.machine.function.InitialFunction;
+import org.apache.tinkerpop.machine.function.MapFunction;
+import org.apache.tinkerpop.machine.function.ReduceFunction;
+import org.apache.tinkerpop.machine.processor.rxjava.BranchFlow;
+import org.apache.tinkerpop.machine.processor.rxjava.FilterFlow;
+import org.apache.tinkerpop.machine.processor.rxjava.FlatMapFlow;
+import org.apache.tinkerpop.machine.processor.rxjava.MapFlow;
+import org.apache.tinkerpop.machine.processor.rxjava.ReduceFlow;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.traverser.TraverserFactory;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
+import org.reactivestreams.Publisher;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TopologyUtil {
+
+ public static <C, S, E> Flowable<Traverser<C, E>> compile(final Flowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) {
+ final TraverserFactory<C> traverserFactory = compilation.getTraverserFactory();
+ Flowable<Traverser<C, E>> sink = (Flowable) source;
+ for (final CFunction<C> function : compilation.getFunctions()) {
+ sink = TopologyUtil.extend(sink, function, traverserFactory);
+ }
+ return sink;
+ }
+
+ /*
+ private final void stageInput() {
+ if (this.hasStartPredicates) {
+ final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove();
+ if (1 == this.untilLocation) {
+ if (this.untilCompilation.filterTraverser(traverser)) {
+ this.outputTraversers.add(traverser);
+ } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser)) {
+ this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
+ this.repeat.addTraverser(traverser);
+ } else
+ this.repeat.addTraverser(traverser);
+ } else if (1 == this.emitLocation) {
+ if (this.emitCompilation.filterTraverser(traverser))
+ this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
+ if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser))
+ this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
+ else
+ this.repeat.addTraverser(traverser);
+ }
+ } else {
+ this.repeat.addTraverser(this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove());
+ }
+ }
+
+
+ */
+
+ private static <C, S, E, B> Flowable<Traverser<C, E>> extend(Flowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) {
+ if (function instanceof MapFunction)
+ return flow.map(new MapFlow<>((MapFunction<C, S, E>) function));
+ else if (function instanceof FilterFunction) {
+ return (Flowable) flow.filter(new FilterFlow<>((FilterFunction<C, S>) function));
+ } else if (function instanceof FlatMapFunction) {
+ return flow.flatMapIterable(new FlatMapFlow<>((FlatMapFunction<C, S, E>) function));
+ } else if (function instanceof InitialFunction) {
+ return Flowable.fromIterable(() -> IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> traverserFactory.create(function, s)));
+ } else if (function instanceof ReduceFunction) {
+ final ReduceFunction<C, S, E> reduceFunction = (ReduceFunction<C, S, E>) function;
+ return flow.reduce(traverserFactory.create(reduceFunction, reduceFunction.getInitialValue()), new ReduceFlow<>(reduceFunction)).toFlowable();
+ } else if (function instanceof BranchFunction) {
+ final Flowable<List> selectorFlow = flow.map(new BranchFlow<>((BranchFunction<C, S, B>) function));
+ final List<Publisher<Traverser<C, E>>> branchFlows = new ArrayList<>();
+ for (final Map.Entry<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches : ((BranchFunction<C, S, E>) function).getBranches().entrySet()) {
+ for (int i = 0; i < branches.getValue().size(); i++) {
+ final Compilation<C, S, E> branch = branches.getValue().get(i);
+ final int id = i;
+ branchFlows.add(
+ selectorFlow.
+ filter(list -> list.get(0).equals(null == branches.getKey() ? -1 : id)).
+ map(list -> (Traverser<C, S>) list.get(1)).
+ publish(f -> compile(f, branch)));
+ }
+ }
+ Flowable<Traverser<C, E>> sink = (Flowable) flow.filter(t -> false); // branches are the only outputs
+ for (final Publisher<Traverser<C, E>> branchFlow : branchFlows) {
+ sink = sink.mergeWith(branchFlow);
+ }
+ return sink;
+ }
+ throw new RuntimeException("Need a new execution plan step: " + function);
+ }
+}
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
new file mode 100644
index 0000000..8085bbc
--- /dev/null
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import org.apache.tinkerpop.language.gremlin.Gremlin;
+import org.apache.tinkerpop.language.gremlin.P;
+import org.apache.tinkerpop.language.gremlin.Traversal;
+import org.apache.tinkerpop.language.gremlin.TraversalSource;
+import org.apache.tinkerpop.language.gremlin.TraversalUtil;
+import org.apache.tinkerpop.language.gremlin.common.__;
+import org.apache.tinkerpop.machine.Machine;
+import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
+import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.tinkerpop.language.gremlin.common.__.incr;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class RxJavaTest {
+
+ @Test
+ public void doStuff() {
+ final Machine machine = LocalMachine.open();
+ final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
+ .withCoefficient(LongCoefficient.class)
+ .withProcessor(RxJavaProcessor.class)
+ .withStrategy(IdentityStrategy.class);
+
+ Traversal<Long, ?, ?> traversal = g.inject(2L).is(P.gt(1)).union(incr(),__.<Long>incr().incr());
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ traversal = g.inject(1L).choose(__.is(1L), incr(),__.<Long>incr().incr());
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+
+ /*traversal = g.inject(1L).until(__.is(P.lt(3L))).emit().repeat(incr());
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");*/
+ }
+}