You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/03/19 00:14:31 UTC

[tinkerpop] branch TINKERPOP-2537 created (now 10bbe8d)

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a change to branch TINKERPOP-2537
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git.


      at 10bbe8d  TINKERPOP-2537 Enable g.tx() for remotes and enable bytecode based sessions

This branch includes the following new commits:

     new 10bbe8d  TINKERPOP-2537 Enable g.tx() for remotes and enable bytecode based sessions

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[tinkerpop] 01/01: TINKERPOP-2537 Enable g.tx() for remotes and enable bytecode based sessions

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2537
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 10bbe8d06acadac5af4ab1b3dd1cf8bc305e2fa8
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Thu Mar 18 20:13:15 2021 -0400

    TINKERPOP-2537 Enable g.tx() for remotes and enable bytecode based sessions
---
 .../tinkerpop/gremlin/jsr223/JavaTranslator.java   |   6 +-
 .../gremlin/process/remote/RemoteConnection.java   |   8 +
 .../gremlin/process/traversal/Bytecode.java        |  12 +
 .../traversal/dsl/graph/GraphTraversalSource.java  |  15 +-
 .../process/traversal/util/BytecodeHelper.java     |   7 +
 .../apache/tinkerpop/gremlin/structure/Graph.java  |  12 +-
 .../tinkerpop/gremlin/structure/Transaction.java   |  98 ++++-
 .../structure/util/AbstractTransaction.java        |  13 +-
 .../process/traversal/util/BytecodeHelperTest.java |  10 +-
 .../apache/tinkerpop/gremlin/driver/Client.java    |   5 +-
 .../apache/tinkerpop/gremlin/driver/Tokens.java    |   6 +-
 .../driver/remote/DriverRemoteConnection.java      |  33 +-
 .../driver/remote/DriverRemoteTransaction.java     | 170 +++++++++
 .../gremlin/server/op/AbstractEvalOpProcessor.java |   4 +-
 .../tinkerpop/gremlin/server/op/OpLoader.java      |   2 +-
 .../server/op/session/SessionOpProcessor.java      | 417 ++++++++++++++++++++-
 .../server/op/standard/StandardOpProcessor.java    |   2 +-
 .../server/GremlinSessionTxIntegrateTest.java      | 241 ++++++++++++
 18 files changed, 1022 insertions(+), 39 deletions(-)

diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/jsr223/JavaTranslator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/jsr223/JavaTranslator.java
index bf90367..7adf87f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/jsr223/JavaTranslator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/jsr223/JavaTranslator.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.TraversalStrategyProxy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.lang.reflect.Array;
@@ -35,7 +36,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Parameter;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -43,7 +43,6 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.StringJoiner;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -76,6 +75,9 @@ public final class JavaTranslator<S extends TraversalSource, T extends Traversal
 
     @Override
     public T translate(final Bytecode bytecode) {
+        if (BytecodeHelper.isGraphOperation(bytecode))
+            throw new IllegalArgumentException("JavaTranslator cannot translate traversal operations");
+
         TraversalSource dynamicSource = this.traversalSource;
         Traversal.Admin<?, ?> traversal = null;
         for (final Bytecode.Instruction instruction : bytecode.getSourceInstructions()) {
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
index c0daaa7..639e286 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java
@@ -22,6 +22,7 @@ import org.apache.commons.configuration2.Configuration;
 import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
 
 import java.lang.reflect.Constructor;
 import java.util.Iterator;
@@ -41,6 +42,13 @@ public interface RemoteConnection extends AutoCloseable {
     public static final String GREMLIN_REMOTE_CONNECTION_CLASS = GREMLIN_REMOTE + "remoteConnectionClass";
 
     /**
+     * Creates a {@link Transaction} object designed to work with remote semantics.
+     */
+    public default Transaction tx() {
+        throw new UnsupportedOperationException("This implementation does not support remote transactions");
+    }
+
+    /**
      * Submits {@link Traversal} {@link Bytecode} to a server and returns a promise of a {@link RemoteTraversal}.
      * The {@link RemoteTraversal} is an abstraction over two types of results that can be returned as part of the
      * response from the server: the results of the {@link Traversal} itself and the side-effects that it produced.
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Bytecode.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Bytecode.java
index 186a87c..810b2af 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Bytecode.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Bytecode.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -54,6 +55,17 @@ public final class Bytecode implements Cloneable, Serializable {
     private List<Instruction> sourceInstructions = new ArrayList<>();
     private List<Instruction> stepInstructions = new ArrayList<>();
 
+    public static final Bytecode TX_COMMIT = new Bytecode("tx", "commit");
+    public static final Bytecode TX_ROLLBACK = new Bytecode("tx", "rollback");
+
+    public static final Set<Bytecode> GRAPH_OPERATIONS = new HashSet<>(Arrays.asList(TX_COMMIT, TX_ROLLBACK));
+
+    public Bytecode() {}
+
+    private Bytecode(final String sourceName, final Object... arguments) {
+        this.sourceInstructions.add(new Instruction(sourceName, flattenArguments(arguments)));
+    }
+
     /**
      * Add a {@link TraversalSource} instruction to the bytecode.
      *
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
index b08ec4f..64fff5d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
@@ -402,10 +402,21 @@ public class GraphTraversalSource implements TraversalSource {
     }
 
     /**
-     * Proxies calls through to the underlying {@link Graph#tx()}.
+     * Proxies calls through to the underlying {@link Graph#tx()} or to the {@link RemoteConnection#tx()}.
      */
     public Transaction tx() {
-        return this.graph.tx();
+        if (null == this.connection)
+            return this.graph.tx();
+        else {
+            // prevent child transactions and let the current Transaction object be bound to the
+            // TraversalSource that spawned it
+            final Transaction tx = this.connection.tx();
+            if (tx == Transaction.NO_OP && this.connection instanceof Transaction)
+                return (Transaction) this.connection;
+            else
+                return tx;
+        }
+
     }
 
     /**
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/BytecodeHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/BytecodeHelper.java
index 77df45b..4ffaee5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/BytecodeHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/BytecodeHelper.java
@@ -67,6 +67,13 @@ public final class BytecodeHelper {
         return clone;
     }
 
+    /**
+     * Checks if the bytecode is one of the standard {@link Bytecode#GRAPH_OPERATIONS}.
+     */
+    public static boolean isGraphOperation(final Bytecode bytecode) {
+        return Bytecode.GRAPH_OPERATIONS.contains(bytecode);
+    }
+
     public static Optional<String> getLambdaLanguage(final Bytecode bytecode) {
         for (final Bytecode.Instruction instruction : bytecode.getInstructions()) {
             for (Object object : instruction.getArguments()) {
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java
index b165cbd..da1c998 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Graph.java
@@ -271,13 +271,19 @@ public interface Graph extends AutoCloseable, Host {
     public Iterator<Edge> edges(final Object... edgeIds);
 
     /**
-     * Configure and control the transactions for those graphs that support this feature.  Note that this method does
-     * not indicate the creation of a "transaction" object.  A {@link Transaction} in the TinkerPop context is a
-     * transaction "factory" or "controller" that helps manage transactions owned by the underlying graph database.
+     * Configure and control the transactions for those graphs that support this feature.
      */
     public Transaction tx();
 
     /**
+     * Configure and control the transactions for those graphs that support this feature. Graphs that support multiple
+     * transaction models can use this method expose different sorts of {@link Transaction} implementations.
+     */
+    public default <Tx extends Transaction> Tx tx(final Class<Tx> txClass) {
+        throw new UnsupportedOperationException("This Graph does not support multiple transaction types - use tx() instead");
+    }
+
+    /**
      * Closing a {@code Graph} is equivalent to "shutdown" and implies that no further operations can be executed on
      * the instance.  Users should consult the documentation of the underlying graph database implementation for what
      * this "shutdown" will mean in general and, if supported, how open transactions are handled.  It will typically
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Transaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Transaction.java
index 64a58c8..a81ed15 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Transaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Transaction.java
@@ -18,27 +18,17 @@
  */
 package org.apache.tinkerpop.gremlin.structure;
 
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.structure.util.AbstractTransaction;
 import org.apache.tinkerpop.gremlin.structure.util.TransactionException;
 
-import java.util.Collections;
-import java.util.Set;
-import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 
 /**
  * A set of methods that allow for control of transactional behavior of a {@link Graph} instance. Providers may
  * consider using {@link AbstractTransaction} as a base implementation that provides default features for most of
  * these methods.
- * <p/>
- * It is expected that this interface be implemented by providers in a {@link ThreadLocal} fashion. In other words
- * transactions are bound to the current thread, which means that any graph operation executed by the thread occurs
- * in the context of that transaction and that there may only be one thread executing in a single transaction.
- * <p/>
- * It is important to realize that this class is not a "transaction object".  It is a class that holds transaction
- * related methods thus hiding them from the {@link Graph} interface.  This object is not meant to be passed around
- * as a transactional context.
  *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  * @author Stephen Mallette (http://stephen.genoprime.com)
@@ -71,8 +61,28 @@ public interface Transaction extends AutoCloseable {
      * A threaded transaction is a {@link Graph} instance that has a transaction context that enables multiple
      * threads to collaborate on the same transaction.  A standard transactional context tied to a {@link Graph}
      * that supports transactions will typically bind a transaction to a single thread via {@link ThreadLocal}.
+     *
+     * @deprecated As of release 3.5.0, replaced by {@link Graph#tx(Class)} ()} in which an implementation of
+     * {@code Transaction} should provide its own methods for exposing a "threaded transaction".
      */
-    public <G extends Graph> G createThreadedTx();
+    @Deprecated
+    public default <G extends Graph> G createThreadedTx() {
+        throw Transaction.Exceptions.threadedTransactionsNotSupported();
+    }
+
+    /**
+     * Starts a transaction in the context of a {@link GraphTraversalSource} instance. It is up to the
+     * {@link Transaction} implementation to decide what this means and up to users to be aware of that meaning.
+     */
+    public default <T extends TraversalSource> T begin() {
+        return (T) begin(GraphTraversalSource.class);
+    }
+
+    /**
+     * Starts a transaction in the context of a particular {@link TraversalSource} instance. It is up to the
+     * {@link Transaction} implementation to decide what this means and up to users to be aware of that meaning.
+     */
+    public <T extends TraversalSource> T begin(final Class<T> traversalSourceClass);
 
     /**
      * Determines if a transaction is currently open.
@@ -218,4 +228,66 @@ public interface Transaction extends AutoCloseable {
             }
         }
     }
+
+    public static final Transaction NO_OP = new Transaction() {
+        @Override
+        public void open() {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public void commit() {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public void rollback() {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public <C extends TraversalSource> C begin(final Class<C> traversalSourceClass) {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public boolean isOpen() {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public void readWrite() {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public void close() {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public Transaction onReadWrite(final Consumer<Transaction> consumer) {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public Transaction onClose(final Consumer<Transaction> consumer) {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public void addTransactionListener(final Consumer<Status> listener) {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public void removeTransactionListener(final Consumer<Status> listener) {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+
+        @Override
+        public void clearTransactionListeners() {
+            throw new UnsupportedOperationException("This Transaction implementation is a no-op for all methods");
+        }
+    };
 }
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
index bbc2bc6..ece32dc 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
@@ -18,11 +18,11 @@
  */
 package org.apache.tinkerpop.gremlin.structure.util;
 
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Transaction;
 
 import java.util.function.Consumer;
-import java.util.function.Function;
 
 /**
  * A simple base class for {@link Transaction} that provides some common functionality and default behavior.
@@ -34,10 +34,10 @@ import java.util.function.Function;
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public abstract class AbstractTransaction implements Transaction {
-    private Graph g;
+    private Graph graph;
 
-    public AbstractTransaction(final Graph g) {
-        this.g = g;
+    public AbstractTransaction(final Graph graph) {
+        this.graph = graph;
     }
 
     /**
@@ -123,6 +123,11 @@ public abstract class AbstractTransaction implements Transaction {
         throw Transaction.Exceptions.threadedTransactionsNotSupported();
     }
 
+    @Override
+    public <T extends TraversalSource> T begin(final Class<T> traversalSourceClass) {
+        return graph.traversal(traversalSourceClass);
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/BytecodeHelperTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/BytecodeHelperTest.java
index 24a94aa..510b353 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/BytecodeHelperTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/BytecodeHelperTest.java
@@ -18,15 +18,12 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.util;
 
-import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
 import org.apache.tinkerpop.gremlin.process.traversal.Bindings;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.OptionsStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
 import org.apache.tinkerpop.gremlin.util.function.Lambda;
 import org.junit.Test;
@@ -94,4 +91,11 @@ public class BytecodeHelperTest {
                 bc, i -> Stream.of(i.getArguments()).anyMatch(o -> o instanceof Bytecode.Binding));
         assertEquals(0, filteredAfterRemoved.getStepInstructions().size());
     }
+
+    @Test
+    public void shouldDetermineOperation() {
+        assertThat(BytecodeHelper.isGraphOperation(Bytecode.TX_COMMIT), is(true));
+        assertThat(BytecodeHelper.isGraphOperation(Bytecode.TX_ROLLBACK), is(true));
+        assertThat(BytecodeHelper.isGraphOperation(g.V().out("knows").asAdmin().getBytecode()), is(false));
+    }
 }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 9d22f13..ac14c85 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -699,7 +699,10 @@ public abstract class Client {
             this.manageTransactions = settings.getSession().get().manageTransactions;
         }
 
-        String getSessionId() {
+        /**
+         * Returns the session identifier bound to this {@code Client}.
+         */
+        public String getSessionId() {
             return sessionId;
         }
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
index 68b5ebd..5617127 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Tokens.java
@@ -97,11 +97,11 @@ public final class Tokens {
      * Implementations that set this key should consider using one of
      * these two recommended value types:
      * <ul>
-     *     <li>A {@link java.util.List} implementation containing
-     *     references for which {@link String#valueOf(Object)} produces
+     *     <li>A {@code List} implementation containing
+     *     references for which {@code String#valueOf(Object)} produces
      *     a meaningful return value.  For example, a list of strings.</li>
      *     <li>Otherwise, any single non-list object for which
-     *     {@link String#valueOf(Object)} produces a meaningful return value.
+     *     {@code String#valueOf(Object)} produces a meaningful return value.
      *     For example, a string.</li>
      * </ul>
      */
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index b542858..ac4eed6 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.OptionsStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.Iterator;
@@ -55,7 +56,7 @@ public class DriverRemoteConnection implements RemoteConnection {
 
     private static final String DEFAULT_TRAVERSAL_SOURCE = "g";
 
-    private final Client client;
+    final Client client;
     private final boolean tryCloseCluster;
     private final boolean tryCloseClient;
     private final String remoteTraversalSourceName;
@@ -113,11 +114,15 @@ public class DriverRemoteConnection implements RemoteConnection {
     }
 
     private DriverRemoteConnection(final Client client, final String remoteTraversalSourceName) {
+        this(client, remoteTraversalSourceName, false);
+    }
+
+    private DriverRemoteConnection(final Client client, final String remoteTraversalSourceName, final boolean tryCloseClient) {
         this.client = client.alias(remoteTraversalSourceName);
         this.remoteTraversalSourceName = remoteTraversalSourceName;
         this.tryCloseCluster = false;
-        attachElements = false;
-        tryCloseClient = false;
+        this.attachElements = false;
+        this.tryCloseClient = tryCloseClient;
     }
 
     /**
@@ -227,6 +232,18 @@ public class DriverRemoteConnection implements RemoteConnection {
         }
     }
 
+    /**
+     * If the connection is bound to a session, then get the session identifier from it.
+     */
+    Optional<String> getSessionId() {
+        if (client instanceof Client.SessionedClient) {
+            Client.SessionedClient c = (Client.SessionedClient) client;
+            return Optional.of(c.getSessionId());
+        }
+
+        return Optional.empty();
+    }
+
     protected static RequestOptions getRequestOptions(final Bytecode bytecode) {
         final Iterator<OptionsStrategy> itty = BytecodeHelper.findStrategies(bytecode, OptionsStrategy.class);
         final RequestOptions.Builder builder = RequestOptions.build();
@@ -258,6 +275,16 @@ public class DriverRemoteConnection implements RemoteConnection {
         }
     }
 
+    /**
+     * Constructs a new {@link DriverRemoteTransaction}.
+     */
+    @Override
+    public Transaction tx() {
+        final DriverRemoteConnection session = new DriverRemoteConnection(
+                client.getCluster().connect(UUID.randomUUID().toString()), remoteTraversalSourceName, true);
+        return new DriverRemoteTransaction(session);
+    }
+
     @Override
     public String toString() {
         return "DriverServerConnection-" + client.getCluster() + " [graph=" + remoteTraversalSourceName + "]";
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTransaction.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTransaction.java
new file mode 100644
index 0000000..e76cff0
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTransaction.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * A remote {@link Transaction} implementation that is implemented with the Java driver. It is also a proxy for a
+ * {@link RemoteConnection} that is bound to a session.
+ * <p/>
+ * For users, starting a transaction with {@link #begin()} will produce a {@link TraversalSource} that can be used
+ * across multiple threads sending the bytecode based requests to a remote session. It is worth noting that the session
+ * will process these requests in a serial fashion and not in parallel. Calling {@link #commit()} or
+ * {@link #rollback()} will also close the session and no additional traversal can be executed on the
+ * {@link TraversalSource}. A fresh call to {@link #begin()} will be required to open a fresh session to work with.
+ * The default behavior of {@link #close()} is to commit the transaction.
+ */
+public class DriverRemoteTransaction implements Transaction, RemoteConnection {
+
+    private final DriverRemoteConnection sessionBasedConnection;
+
+    protected Consumer<Transaction> closeConsumer = CLOSE_BEHAVIOR.COMMIT;
+
+    public DriverRemoteTransaction(final DriverRemoteConnection sessionBasedConnection) {
+        this.sessionBasedConnection = sessionBasedConnection;
+    }
+
+    @Override
+    public <T extends TraversalSource> T begin(final Class<T> traversalSourceClass) {
+        if (!isOpen())
+            throw new IllegalStateException("Transaction cannot begin as the session is already closed - create a new Transaction");
+
+        try {
+            return traversalSourceClass.getConstructor(RemoteConnection.class).newInstance(this);
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * By virtue of creating a {@code DriverRemoteTransaction}, the transaction is considered open. There is no need
+     * to call this method. Calling it when the transaction is closed will result in exception.
+     */
+    @Override
+    public void open() {
+        // no need to issue a command to open the transaction, the server is already in such a state if the
+        if (!isOpen())
+            throw new IllegalStateException("Transaction cannot be opened as the session is already closed - create a new Transaction");
+    }
+
+    @Override
+    public void commit() {
+        closeRemoteTransaction(Bytecode.TX_COMMIT, "Transaction commit for %s failed");
+    }
+
+    @Override
+    public void rollback() {
+        closeRemoteTransaction(Bytecode.TX_ROLLBACK, "Transaction rollback for %s failed");
+    }
+
+    private void closeRemoteTransaction(final Bytecode closeTxWith, final String failureMsg) {
+        try {
+            // kinda weird but we hasNext() the graph command here to ensure that it runs to completion or
+            // else you don't guarantee that we have the returned NO_CONTENT message in hand before proceeding
+            // which could mean the transaction is still in the process of committing. not sure why iterate()
+            // doesn't quite work in this context.
+            this.sessionBasedConnection.submitAsync(closeTxWith).join().hasNext();
+            this.sessionBasedConnection.close();
+        } catch (Exception ex) {
+            throw new RuntimeException(String.format(failureMsg, sessionBasedConnection.getSessionId()), ex);
+        }
+    }
+
+    @Override
+    public boolean isOpen() {
+        // for tx purposes closing is a good enough check
+        return !sessionBasedConnection.client.isClosing();
+    }
+
+    /**
+     * The default close behavior for this {@link Transaction} implementation is to {@link #commit()}.
+     */
+    @Override
+    public void close() {
+        this.closeConsumer.accept(this);
+    }
+
+    /**
+     * This {@link Transaction} implementation is not auto-managed and therefore this method is not supported.
+     */
+    @Override
+    public void readWrite() {
+        throw new UnsupportedOperationException("Remote transaction behaviors are not auto-managed - they are always manually controlled");
+    }
+
+    /**
+     * This {@link Transaction} implementation is not auto-managed and therefore this method is not supported.
+     */
+    @Override
+    public Transaction onReadWrite(final Consumer<Transaction> consumer) {
+        throw new UnsupportedOperationException("Remote transaction behaviors are not configurable - they are always manually controlled");
+    }
+
+    @Override
+    public Transaction onClose(final Consumer<Transaction> consumer) {
+        this.closeConsumer = consumer;
+        return this;
+    }
+
+    /**
+     * There is no support for remote transaction listeners.
+     */
+    @Override
+    public void addTransactionListener(final Consumer<Status> listener) {
+        throw new UnsupportedOperationException("Remote transactions cannot have listeners attached");
+    }
+
+    /**
+     * There is no support for remote transaction listeners.
+     */
+    @Override
+    public void removeTransactionListener(final Consumer<Status> listener) {
+        throw new UnsupportedOperationException("Remote transactions cannot have listeners attached");
+    }
+
+    /**
+     * There is no support for remote transaction listeners.
+     */
+    @Override
+    public void clearTransactionListeners() {
+        throw new UnsupportedOperationException("Remote transactions cannot have listeners attached");
+    }
+
+    /**
+     * It is not possible to have child transactions, therefore this method always returns {@link Transaction#NO_OP}.
+     */
+    @Override
+    public Transaction tx() {
+        return Transaction.NO_OP;
+    }
+
+    @Override
+    public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final Bytecode bytecode) throws RemoteConnectionException {
+        return sessionBasedConnection.submitAsync(bytecode);
+    }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
index 49cc947..609d1f2 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
@@ -135,7 +135,7 @@ public abstract class AbstractEvalOpProcessor extends AbstractOpProcessor {
      * handled will be passed to this method to see if the sub-class can service the requested op code.
      * @return
      */
-    public abstract Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage) throws OpProcessorException;
+    public abstract Optional<ThrowingConsumer<Context>> selectOther(final Context ctx) throws OpProcessorException;
 
     @Override
     public ThrowingConsumer<Context> select(final Context ctx) throws OpProcessorException {
@@ -151,7 +151,7 @@ public abstract class AbstractEvalOpProcessor extends AbstractOpProcessor {
                 final String msgInvalid = String.format("Message could not be parsed.  Check the format of the request. [%s]", message);
                 throw new OpProcessorException(msgInvalid, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).statusMessage(msgInvalid).create());
             default:
-                op = selectOther(message).orElseThrow(() -> {
+                op = selectOther(ctx).orElseThrow(() -> {
                     final String msgDefault = String.format("Message with op code [%s] is not recognized.", message.getOp());
                     return new OpProcessorException(msgDefault, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).statusMessage(msgDefault).create());
                 });
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/OpLoader.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/OpLoader.java
index 5b69a64..b165436 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/OpLoader.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/OpLoader.java
@@ -65,7 +65,7 @@ public final class OpLoader {
     }
 
     /**
-     * Gets an {@link OpProcessor} by its name. If it cannot be found an {@link Optional#EMPTY} is returned.
+     * Gets an {@link OpProcessor} by its name. If it cannot be found an {@link Optional#empty()} is returned.
      */
     public static Optional<OpProcessor> getProcessor(final String name) {
         return Optional.ofNullable(processors.get(name));
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
index b4d6d4e..19c373f 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
@@ -18,30 +18,54 @@
  */
 package org.apache.tinkerpop.gremlin.server.op.session;
 
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
+import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.OpProcessor;
 import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
+import org.apache.tinkerpop.gremlin.server.handler.Frame;
 import org.apache.tinkerpop.gremlin.server.handler.StateKey;
 import org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor;
 import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
 import org.apache.tinkerpop.gremlin.server.util.MetricManager;
+import org.apache.tinkerpop.gremlin.server.util.TraverserIterator;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion;
 import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer;
+import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.script.Bindings;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -54,6 +78,10 @@ import static com.codahale.metrics.MetricRegistry.name;
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public class SessionOpProcessor extends AbstractEvalOpProcessor {
+    private static final ObjectMapper mapper = GraphSONMapper.build().version(GraphSONVersion.V2_0).create().createMapper();
+    private static final Logger auditLogger = LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
+    private static final Bindings EMPTY_BINDINGS = new SimpleBindings();
+
     private static final Logger logger = LoggerFactory.getLogger(SessionOpProcessor.class);
     public static final String OP_PROCESSOR_NAME = "session";
 
@@ -127,7 +155,9 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
      * this may be removed completely. Note that closing the channel kills the session now.
      */
     @Override
-    public Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage) throws OpProcessorException {
+    public Optional<ThrowingConsumer<Context>> selectOther(final Context ctx) throws OpProcessorException {
+        final RequestMessage requestMessage = ctx.getRequestMessage();
+
         // deprecated the "close" message at 3.3.11 - left this check for the "close" token so that if older versions
         // of the driver connect they won't get an error. basically just writes back a NO_CONTENT
         // for the immediate term in 3.5.0 and then for some future version remove support for the message completely
@@ -145,11 +175,47 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
                         .code(ResponseStatusCode.NO_CONTENT)
                         .create());
             });
+        } else if (requestMessage.getOp().equals(Tokens.OPS_BYTECODE)) {
+            validateTraversalSourceAlias(ctx, requestMessage, validateTraversalRequest(requestMessage));
+            return Optional.of(this::iterateBytecodeTraversal);
         } else {
             return Optional.empty();
         }
     }
 
+    private static void validateTraversalSourceAlias(final Context ctx, final RequestMessage message, final Map<String, String> aliases) throws OpProcessorException {
+        final String traversalSourceBindingForAlias = aliases.values().iterator().next();
+        if (!ctx.getGraphManager().getTraversalSourceNames().contains(traversalSourceBindingForAlias)) {
+            final String msg = String.format("The traversal source [%s] for alias [%s] is not configured on the server.", traversalSourceBindingForAlias, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+            throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+    }
+
+    private static Map<String, String> validateTraversalRequest(final RequestMessage message) throws OpProcessorException {
+        if (!message.optionalArgs(Tokens.ARGS_GREMLIN).isPresent()) {
+            final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_GREMLIN);
+            throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        return validatedAliases(message).get();
+    }
+
+    private static Optional<Map<String, String>> validatedAliases(final RequestMessage message) throws OpProcessorException {
+        final Optional<Map<String, String>> aliases = message.optionalArgs(Tokens.ARGS_ALIASES);
+        if (!aliases.isPresent()) {
+            final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+            throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        if (aliases.get().size() != 1 || !aliases.get().containsKey(Tokens.VAL_TRAVERSAL_SOURCE_ALIAS)) {
+            final String msg = String.format("A message with [%s] op code requires the [%s] argument to be a Map containing one alias assignment named '%s'.",
+                    Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+            throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        return aliases;
+    }
+
     @Override
     public ThrowingConsumer<Context> getEvalOp() {
         return this::evalOp;
@@ -268,4 +334,353 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
             return bindings;
         };
     }
+
+    private void iterateBytecodeTraversal(final Context context) throws Exception {
+        final RequestMessage msg = context.getRequestMessage();
+        final Settings settings = context.getSettings();
+        logger.debug("Traversal request {} for in thread {}", msg.getRequestId(), Thread.currentThread().getName());
+
+        // right now the TraversalOpProcessor can take a direct GraphSON representation of Bytecode or directly take
+        // deserialized Bytecode object.
+        final Object bytecodeObj = msg.getArgs().get(Tokens.ARGS_GREMLIN);
+        final Bytecode bytecode = bytecodeObj instanceof Bytecode ? (Bytecode) bytecodeObj :
+                mapper.readValue(bytecodeObj.toString(), Bytecode.class);
+
+        // earlier validation in selection of this op method should free us to cast this without worry
+        final Map<String, String> aliases = (Map<String, String>) msg.optionalArgs(Tokens.ARGS_ALIASES).get();
+
+        // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent
+        // both configurations from being submitted at the same time
+        final Map<String, Object> args = msg.getArgs();
+        final long seto = args.containsKey(Tokens.ARGS_EVAL_TIMEOUT) ?
+                ((Number) args.get(Tokens.ARGS_EVAL_TIMEOUT)).longValue() : context.getSettings().getEvaluationTimeout();
+
+        final GraphManager graphManager = context.getGraphManager();
+        final String traversalSourceName = aliases.entrySet().iterator().next().getValue();
+        final TraversalSource g = graphManager.getTraversalSource(traversalSourceName);
+
+        // todo: should session be grabbed here???
+        final Session session = getSession(context, msg);
+
+        // handle bytecode based graph operations like commit/rollback commands
+        if (BytecodeHelper.isGraphOperation(bytecode)) {
+            handleGraphOperation(bytecode, g.getGraph(), context);
+            return;
+        }
+
+        final Traversal.Admin<?, ?> traversal;
+        try {
+            final Optional<String> lambdaLanguage = BytecodeHelper.getLambdaLanguage(bytecode);
+            if (!lambdaLanguage.isPresent())
+                traversal = JavaTranslator.of(g).translate(bytecode);
+            else
+                traversal = session.getGremlinExecutor().eval(bytecode, EMPTY_BINDINGS, lambdaLanguage.get(), traversalSourceName);
+        } catch (ScriptException ex) {
+            logger.error("Traversal contains a lambda that cannot be compiled", ex);
+            throw new OpProcessorException("Traversal contains a lambda that cannot be compiled",
+                    ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+                            .statusMessage(ex.getMessage())
+                            .statusAttributeException(ex).create());
+        } catch (Exception ex) {
+            logger.error("Could not deserialize the Traversal instance", ex);
+            throw new OpProcessorException("Could not deserialize the Traversal instance",
+                    ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION)
+                            .statusMessage(ex.getMessage())
+                            .statusAttributeException(ex).create());
+        }
+
+        if (settings.enableAuditLog) {
+            AuthenticatedUser user = context.getChannelHandlerContext().channel().attr(StateKey.AUTHENTICATED_USER).get();
+            if (null == user) {    // This is expected when using the AllowAllAuthenticator
+                user = AuthenticatedUser.ANONYMOUS_USER;
+            }
+            String address = context.getChannelHandlerContext().channel().remoteAddress().toString();
+            if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+            auditLogger.info("User {} with address {} requested: {}", user.getName(), address, bytecode);
+        }
+        if (settings.authentication.enableAuditLog) {
+            String address = context.getChannelHandlerContext().channel().remoteAddress().toString();
+            if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+            auditLogger.info("User with address {} requested: {}", address, bytecode);
+        }
+
+        // todo: timer matter???
+        // final Timer.Context timerContext = traversalOpTimer.time();
+
+        final FutureTask<Void> evalFuture = new FutureTask<>(() -> {
+            final Graph graph = g.getGraph();
+
+            try {
+                beforeProcessing(graph, context);
+
+                try {
+                    // compile the traversal - without it getEndStep() has nothing in it
+                    traversal.applyStrategies();
+                    handleIterator(context, new TraverserIterator(traversal), graph);
+                } catch (Exception ex) {
+                    Throwable t = ex;
+                    if (ex instanceof UndeclaredThrowableException)
+                        t = t.getCause();
+
+                    // if any exception in the chain is TemporaryException then we should respond with the right error
+                    // code so that the client knows to retry
+                    final Optional<Throwable> possibleTemporaryException = determineIfTemporaryException(ex);
+                    if (possibleTemporaryException.isPresent()) {
+                        context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
+                                .statusMessage(possibleTemporaryException.get().getMessage())
+                                .statusAttributeException(possibleTemporaryException.get()).create());
+                    } else if (t instanceof InterruptedException || t instanceof TraversalInterruptedException) {
+                        final String errorMessage = String.format("A timeout occurred during traversal evaluation of [%s] - consider increasing the limit given to evaluationTimeout", msg);
+                        logger.warn(errorMessage);
+                        context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                                .statusMessage(errorMessage)
+                                .statusAttributeException(ex).create());
+                    } else {
+                        logger.warn(String.format("Exception processing a Traversal on iteration for request [%s].", msg.getRequestId()), ex);
+                        context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                                .statusMessage(ex.getMessage())
+                                .statusAttributeException(ex).create());
+                    }
+                    onError(graph, context);
+                }
+            } catch (Exception ex) {
+                final Optional<Throwable> possibleTemporaryException = determineIfTemporaryException(ex);
+                if (possibleTemporaryException.isPresent()) {
+                    context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
+                            .statusMessage(possibleTemporaryException.get().getMessage())
+                            .statusAttributeException(possibleTemporaryException.get()).create());
+                } else {
+                    logger.warn(String.format("Exception processing a Traversal on request [%s].", msg.getRequestId()), ex);
+                    context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(ex.getMessage())
+                            .statusAttributeException(ex).create());
+                }
+                onError(graph, context);
+            } finally {
+                // todo: timer matter???
+                //timerContext.stop();
+            }
+
+            return null;
+        });
+
+        submitToGremlinExecutor(context, seto, session, evalFuture);
+    }
+
+    private static void submitToGremlinExecutor(final Context context, final long seto, final Session session,
+                                                final FutureTask<Void> evalFuture) {
+        final Future<?> executionFuture = session.getGremlinExecutor().getExecutorService().submit(evalFuture);
+        if (seto > 0) {
+            // Schedule a timeout in the thread pool for future execution
+            context.getScheduledExecutorService().schedule(() -> executionFuture.cancel(true), seto, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * If {@link Bytecode} is detected to contain a "graph operation" then it gets processed by this method.
+     */
+    protected void handleGraphOperation(final Bytecode bytecode, final Graph graph, final Context context) {
+        final RequestMessage msg = context.getRequestMessage();
+        final Session session = getSession(context, msg);
+        if (graph.features().graph().supportsTransactions()) {
+            if (bytecode.equals(Bytecode.TX_COMMIT) || bytecode.equals(Bytecode.TX_ROLLBACK)) {
+                final boolean commit = bytecode.equals(Bytecode.TX_COMMIT);
+                submitToGremlinExecutor(context, 0, session, new FutureTask<>(() -> {
+                    try {
+                        if (graph.tx().isOpen()) {
+                            if (commit)
+                                graph.tx().commit();
+                            else
+                                graph.tx().rollback();
+                        }
+
+                        // write back a no-op for success
+                        final Map<String, Object> attributes = generateStatusAttributes(
+                                context.getChannelHandlerContext(), msg,
+                                ResponseStatusCode.NO_CONTENT, Collections.emptyIterator(), context.getSettings());
+                        context.writeAndFlush(ResponseMessage.build(msg)
+                                .code(ResponseStatusCode.NO_CONTENT)
+                                .statusAttributes(attributes)
+                                .create());
+
+                    } catch (Exception ex) {
+                        final Optional<Throwable> possibleTemporaryException = determineIfTemporaryException(ex);
+                        if (possibleTemporaryException.isPresent()) {
+                            context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
+                                    .statusMessage(possibleTemporaryException.get().getMessage())
+                                    .statusAttributeException(possibleTemporaryException.get()).create());
+                        } else {
+                            logger.warn(String.format("Exception processing a Traversal on request [%s].", msg.getRequestId()), ex);
+                            context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                                    .statusMessage(ex.getMessage())
+                                    .statusAttributeException(ex).create());
+                        }
+                        onError(graph, context);
+                    }
+
+                    return null;
+                }));
+            } else {
+                throw new IllegalStateException(String.format(
+                        "Bytecode in request is not a recognized graph operation: %s", bytecode.toString()));
+            }
+        }
+    }
+
+    protected void beforeProcessing(final Graph graph, final Context ctx) {
+        final boolean managedTransactionsForRequest = manageTransactions ?
+                true : (Boolean) ctx.getRequestMessage().getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false);
+        if (managedTransactionsForRequest && graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+    }
+
+    protected void onError(final Graph graph, final Context ctx) {
+        final boolean managedTransactionsForRequest = manageTransactions ?
+                true : (Boolean) ctx.getRequestMessage().getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false);
+        if (managedTransactionsForRequest && graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+    }
+
+    protected void onTraversalSuccess(final Graph graph, final Context ctx) {
+        final boolean managedTransactionsForRequest = manageTransactions ?
+                true : (Boolean) ctx.getRequestMessage().getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false);
+        if (managedTransactionsForRequest && graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().commit();
+    }
+
+    protected void handleIterator(final Context context, final Iterator itty, final Graph graph) throws InterruptedException {
+        final ChannelHandlerContext nettyContext = context.getChannelHandlerContext();
+        final RequestMessage msg = context.getRequestMessage();
+        final Settings settings = context.getSettings();
+        final MessageSerializer serializer = nettyContext.channel().attr(StateKey.SERIALIZER).get();
+        final boolean useBinary = nettyContext.channel().attr(StateKey.USE_BINARY).get();
+        boolean warnOnce = false;
+
+        // we have an empty iterator - happens on stuff like: g.V().iterate()
+        if (!itty.hasNext()) {
+            final Map<String, Object> attributes = generateStatusAttributes(nettyContext, msg, ResponseStatusCode.NO_CONTENT, itty, settings);
+
+            // as there is nothing left to iterate if we are transaction managed then we should execute a
+            // commit here before we send back a NO_CONTENT which implies success
+            onTraversalSuccess(graph, context);
+            context.writeAndFlush(ResponseMessage.build(msg)
+                    .code(ResponseStatusCode.NO_CONTENT)
+                    .statusAttributes(attributes)
+                    .create());
+            return;
+        }
+
+        // the batch size can be overridden by the request
+        final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
+                .orElse(settings.resultIterationBatchSize);
+        List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
+
+        // use an external control to manage the loop as opposed to just checking hasNext() in the while.  this
+        // prevent situations where auto transactions create a new transaction after calls to commit() withing
+        // the loop on calls to hasNext().
+        boolean hasMore = itty.hasNext();
+
+        while (hasMore) {
+            if (Thread.interrupted()) throw new InterruptedException();
+
+            // check if an implementation needs to force flush the aggregated results before the iteration batch
+            // size is reached.
+            final boolean forceFlush = isForceFlushed(nettyContext, msg, itty);
+
+            // have to check the aggregate size because it is possible that the channel is not writeable (below)
+            // so iterating next() if the message is not written and flushed would bump the aggregate size beyond
+            // the expected resultIterationBatchSize.  Total serialization time for the response remains in
+            // effect so if the client is "slow" it may simply timeout.
+            //
+            // there is a need to check hasNext() on the iterator because if the channel is not writeable the
+            // previous pass through the while loop will have next()'d the iterator and if it is "done" then a
+            // NoSuchElementException will raise its head. also need a check to ensure that this iteration doesn't
+            // require a forced flush which can be forced by sub-classes.
+            //
+            // this could be placed inside the isWriteable() portion of the if-then below but it seems better to
+            // allow iteration to continue into a batch if that is possible rather than just doing nothing at all
+            // while waiting for the client to catch up
+            if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) aggregate.add(itty.next());
+
+            // Don't keep executor busy if client has already given up; there is no way to catch up if the channel is
+            // not active, and hence we should break the loop.
+            if (!nettyContext.channel().isActive()) {
+                onError(graph, context);
+                break;
+            }
+
+            // send back a page of results if batch size is met or if it's the end of the results being iterated.
+            // also check writeability of the channel to prevent OOME for slow clients.
+            //
+            // clients might decide to close the Netty channel to the server with a CloseWebsocketFrame after errors
+            // like CorruptedFrameException. On the server, although the channel gets closed, there might be some
+            // executor threads waiting for watermark to clear which will not clear in these cases since client has
+            // already given up on these requests. This leads to these executors waiting for the client to consume
+            // results till the timeout. checking for isActive() should help prevent that.
+            if (nettyContext.channel().isActive() && nettyContext.channel().isWritable()) {
+                if (forceFlush || aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
+                    final ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
+
+                    // serialize here because in sessionless requests the serialization must occur in the same
+                    // thread as the eval.  as eval occurs in the GremlinExecutor there's no way to get back to the
+                    // thread that processed the eval of the script so, we have to push serialization down into that
+                    final Map<String, Object> metadata = generateResultMetaData(nettyContext, msg, code, itty, settings);
+                    final Map<String, Object> statusAttrb = generateStatusAttributes(nettyContext, msg, code, itty, settings);
+                    Frame frame = null;
+                    try {
+                        frame = makeFrame(context, msg, serializer, useBinary, aggregate, code,
+                                metadata, statusAttrb);
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+
+                        // exception is handled in makeFrame() - serialization error gets written back to driver
+                        // at that point
+                        onError(graph, context);
+                        break;
+                    }
+
+                    try {
+                        // only need to reset the aggregation list if there's more stuff to write
+                        if (itty.hasNext())
+                            aggregate = new ArrayList<>(resultIterationBatchSize);
+                        else {
+                            // iteration and serialization are both complete which means this finished successfully. note that
+                            // errors internal to script eval or timeout will rollback given GremlinServer's global configurations.
+                            // local errors will get rolledback below because the exceptions aren't thrown in those cases to be
+                            // caught by the GremlinExecutor for global rollback logic. this only needs to be committed if
+                            // there are no more items to iterate and serialization is complete
+                            onTraversalSuccess(graph, context);
+
+                            // exit the result iteration loop as there are no more results left.  using this external control
+                            // because of the above commit.  some graphs may open a new transaction on the call to
+                            // hasNext()
+                            hasMore = false;
+                        }
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+                        throw ex;
+                    }
+
+                    if (!itty.hasNext()) iterateComplete(nettyContext, msg, itty);
+
+                    // the flush is called after the commit has potentially occurred.  in this way, if a commit was
+                    // required then it will be 100% complete before the client receives it. the "frame" at this point
+                    // should have completely detached objects from the transaction (i.e. serialization has occurred)
+                    // so a new one should not be opened on the flush down the netty pipeline
+                    context.writeAndFlush(code, frame);
+                }
+            } else {
+                // don't keep triggering this warning over and over again for the same request
+                if (!warnOnce) {
+                    logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", msg);
+                    warnOnce = true;
+                }
+
+                // since the client is lagging we can hold here for a period of time for the client to catch up.
+                // this isn't blocking the IO thread - just a worker.
+                TimeUnit.MILLISECONDS.sleep(10);
+            }
+        }
+    }
 }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
index 746397a..81df30c 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
@@ -84,7 +84,7 @@ public class StandardOpProcessor extends AbstractEvalOpProcessor {
     }
 
     @Override
-    public Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage)  throws OpProcessorException {
+    public Optional<ThrowingConsumer<Context>> selectOther(final Context ctx)  throws OpProcessorException {
         return Optional.empty();
     }
 
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java
new file mode 100644
index 0000000..47b501c
--- /dev/null
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionTxIntegrateTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for gremlin-driver and bytecode sessions.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class GremlinSessionTxIntegrateTest extends AbstractGremlinServerIntegrationTest {
+
+    /**
+     * Configure specific Gremlin Server settings for specific tests.
+     */
+    @Override
+    public Settings overrideSettings(final Settings settings) {
+        final String nameOfTest = name.getMethodName();
+
+        deleteDirectory(new File("/tmp/neo4j"));
+        settings.graphs.put("graph", "conf/neo4j-empty.properties");
+
+        switch (nameOfTest) {
+            case "shouldExecuteBytecodeInSession":
+                break;
+        }
+
+        return settings;
+    }
+
+    @Test
+    public void shouldCommitTxBytecodeInSession() throws Exception {
+        assumeNeo4jIsPresent();
+
+        final Cluster cluster = TestClientFactory.build().create();
+        final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+        final GraphTraversalSource gtx = g.tx().begin();
+        assertThat(gtx.tx().isOpen(), is(true));
+
+        gtx.addV("person").iterate();
+        assertEquals(1, (long) gtx.V().count().next());
+
+        // outside the session we should be at zero
+        assertEquals(0, (long) g.V().count().next());
+
+        gtx.tx().commit();
+        assertThat(gtx.tx().isOpen(), is(false));
+
+        // sessionless connections should still be good - close() should not affect that
+        assertEquals(1, (long) g.V().count().next());
+
+        // but the spawned gtx should be dead
+        try {
+            gtx.addV("software").iterate();
+            fail("Should have failed since we committed the transaction");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertEquals("Client is closed", root.getMessage());
+        }
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldCommitTxBytecodeInSessionWithExplicitTransactionObject() throws Exception {
+        assumeNeo4jIsPresent();
+
+        final Cluster cluster = TestClientFactory.build().create();
+        final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
+        final Transaction tx = g.tx();
+        assertThat(tx.isOpen(), is(true));
+
+        final GraphTraversalSource gtx = tx.begin();
+        gtx.addV("person").iterate();
+        assertEquals(1, (long) gtx.V().count().next());
+        tx.commit();
+        assertThat(tx.isOpen(), is(false));
+
+        // sessionless connections should still be good - close() should not affect that
+        assertEquals(1, (long) g.V().count().next());
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldRollbackTxBytecodeInSession() throws Exception {
+        assumeNeo4jIsPresent();
+
+        final Cluster cluster = TestClientFactory.build().create();
+        final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+        final GraphTraversalSource gtx = g.tx().begin();
+        assertThat(gtx.tx().isOpen(), is(true));
+
+        gtx.addV("person").iterate();
+        assertEquals(1, (long) gtx.V().count().next());
+        gtx.tx().rollback();
+        assertThat(gtx.tx().isOpen(), is(false));
+
+        // sessionless connections should still be good - close() should not affect that
+        assertEquals(0, (long) g.V().count().next());
+
+        // but the spawned gtx should be dead
+        try {
+            gtx.addV("software").iterate();
+            fail("Should have failed since we committed the transaction");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertEquals("Client is closed", root.getMessage());
+        }
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldCommitTxBytecodeInSessionOnCloseOfGtx() throws Exception {
+        assumeNeo4jIsPresent();
+
+        final Cluster cluster = TestClientFactory.build().create();
+        final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+        final GraphTraversalSource gtx = g.tx().begin();
+        assertThat(gtx.tx().isOpen(), is(true));
+
+        gtx.addV("person").iterate();
+        assertEquals(1, (long) gtx.V().count().next());
+        gtx.close();
+        assertThat(gtx.tx().isOpen(), is(false));
+
+        // sessionless connections should still be good - close() should not affect that
+        assertEquals(1, (long) g.V().count().next());
+
+        // but the spawned gtx should be dead
+        try {
+            gtx.addV("software").iterate();
+            fail("Should have failed since we committed the transaction");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertEquals("Client is closed", root.getMessage());
+        }
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldCommitTxBytecodeInSessionOnCloseTx() throws Exception {
+        assumeNeo4jIsPresent();
+
+        final Cluster cluster = TestClientFactory.build().create();
+        final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+        final GraphTraversalSource gtx = g.tx().begin();
+        assertThat(gtx.tx().isOpen(), is(true));
+
+        gtx.addV("person").iterate();
+        assertEquals(1, (long) gtx.V().count().next());
+        gtx.tx().close();
+        assertThat(gtx.tx().isOpen(), is(false));
+
+        // sessionless connections should still be good - close() should not affect that
+        assertEquals(1, (long) g.V().count().next());
+
+        // but the spawned gtx should be dead
+        try {
+            gtx.addV("software").iterate();
+            fail("Should have failed since we committed the transaction");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertEquals("Client is closed", root.getMessage());
+        }
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldCommitTxBytecodeInSessionReusingGtxAcrossThreads() throws Exception {
+        assumeNeo4jIsPresent();
+
+        final ExecutorService service = Executors.newFixedThreadPool(2);
+
+        final Cluster cluster = TestClientFactory.build().create();
+        final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+        final GraphTraversalSource gtx = g.tx().begin();
+        assertThat(gtx.tx().isOpen(), is(true));
+
+        final int verticesToAdd = 64;
+        for (int ix = 0; ix < verticesToAdd; ix++) {
+            service.submit(() -> gtx.addV("person").iterate());
+        }
+
+        service.shutdown();
+        service.awaitTermination(90000, TimeUnit.MILLISECONDS);
+
+        // outside the session we should be at zero
+        assertEquals(0, (long) g.V().count().next());
+
+        assertEquals(verticesToAdd, (long) gtx.V().count().next());
+        gtx.tx().commit();
+        assertThat(gtx.tx().isOpen(), is(false));
+
+        // sessionless connections should still be good - close() should not affect that
+        assertEquals(verticesToAdd, (long) g.V().count().next());
+
+        cluster.close();
+    }
+}