You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by oh...@apache.org on 2018/12/20 11:52:31 UTC

phoenix git commit: [PHOENIX-3623] Integrate Omid with Phoenix.

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.3 4975f0b12 -> 850114334


[PHOENIX-3623] Integrate Omid with Phoenix.

This commit finishes the integration of Omid as Phoenix transaction processor engine.
More information regarding the integration exists at [PHOENIX-3623] and at [OMID-82], which is the corresponding jira in Omid.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/85011433
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/85011433
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/85011433

Branch: refs/heads/4.x-HBase-1.3
Commit: 8501143345b291765fcd489fbfba7d4b6be24e01
Parents: 4975f0b
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Thu Dec 20 13:47:38 2018 +0200
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Thu Dec 20 13:47:38 2018 +0200

----------------------------------------------------------------------
 bin/omid-env.sh                                 |  43 ++++
 bin/omid-server-configuration.yml               |  25 +++
 bin/omid.sh                                     |  93 ++++++++
 phoenix-assembly/pom.xml                        |   5 +
 .../components/all-common-dependencies.xml      |  28 +++
 phoenix-core/pom.xml                            |  46 ++++
 .../phoenix/coprocessor/OmidGCProcessor.java    |   6 +-
 .../coprocessor/OmidTransactionalProcessor.java |   7 +-
 .../transaction/OmidTransactionContext.java     | 217 ++++++++++++++++++-
 .../transaction/OmidTransactionProvider.java    | 106 ++++++++-
 .../transaction/OmidTransactionTable.java       |  64 +++++-
 .../phoenix/transaction/TransactionFactory.java |   4 +-
 .../phoenix/query/QueryServicesTestImpl.java    |   1 -
 .../java/org/apache/phoenix/util/TestUtil.java  |   2 +-
 phoenix-server/pom.xml                          |   1 +
 pom.xml                                         |  47 ++++
 16 files changed, 665 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/bin/omid-env.sh
----------------------------------------------------------------------
diff --git a/bin/omid-env.sh b/bin/omid-env.sh
new file mode 100644
index 0000000..820cdaa
--- /dev/null
+++ b/bin/omid-env.sh
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set the flags to pass to the jvm when running omid
+# export JVM_FLAGS=-Xmx8096m
+# ---------------------------------------------------------------------------------------------------------------------
+# Check if HADOOP_CONF_DIR and HBASE_CONF_DIR are set
+# ---------------------------------------------------------------------------------------------------------------------
+export JVM_FLAGS=-Xmx4096m
+if [ -z ${HADOOP_CONF_DIR+x} ]; then
+    if [ -z ${HADOOP_HOME+x} ]; then
+        echo "WARNING: HADOOP_HOME or HADOOP_CONF_DIR are unset";
+    else
+        export HADOOP_CONF_DIR=${HADOOP_HOME}/conf
+    fi
+else
+    echo "HADOOP_CONF_DIR is set to '$HADOOP_CONF_DIR'";
+fi
+
+if [ -z ${HBASE_CONF_DIR+x} ]; then
+    if [ -z ${HBASE_HOME+x} ]; then
+        echo "WARNING: HBASE_HOME or HBASE_CONF_DIR are unset";
+    else
+        export HBASE_CONF_DIR=${HBASE_HOME}/conf
+    fi
+else
+    echo "HBASE_CONF_DIR is set to '$HBASE_CONF_DIR'";
+fi

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/bin/omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/bin/omid-server-configuration.yml b/bin/omid-server-configuration.yml
new file mode 100644
index 0000000..8d1616e
--- /dev/null
+++ b/bin/omid-server-configuration.yml
@@ -0,0 +1,25 @@
+# =====================================================================================================================
+#
+# Omid TSO Server Configuration
+# ---------------------------------------------------------------------------------------------------------------------
+#
+# Tune here the default values for TSO server config parameters found in 'default-omid-server-configuration.yml' file
+#
+# =====================================================================================================================
+
+
+timestampStoreModule: !!org.apache.omid.timestamp.storage.HBaseTimestampStorageModule [ ]
+commitTableStoreModule: !!org.apache.omid.committable.hbase.HBaseCommitTableStorageModule [ ]
+
+metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [
+!!org.apache.omid.metrics.CodahaleMetricsConfig {
+  outputFreqInSecs: 10,
+  reporters: !!set {
+    !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CSV
+  },
+  csvDir: "csvMetrics",
+}
+]
+
+timestampType: WORLD_TIME
+lowLatency: false

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/bin/omid.sh
----------------------------------------------------------------------
diff --git a/bin/omid.sh b/bin/omid.sh
new file mode 100755
index 0000000..5b33ed5
--- /dev/null
+++ b/bin/omid.sh
@@ -0,0 +1,93 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SCRIPTDIR=`dirname $0`
+cd $SCRIPTDIR;
+
+# Load Omid environment variables
+source omid-env.sh
+
+# Configure classpath...
+CLASSPATH=./:../conf:${HBASE_CONF_DIR}:${HADOOP_CONF_DIR}
+
+# ...for source release and...
+for j in ../target/omid-tso*.jar; do
+    CLASSPATH=$CLASSPATH:$j
+done
+
+# and for binary release
+for j in ../omid-tso*.jar; do
+    CLASSPATH=$CLASSPATH:$j
+done
+for j in ../lib/*.jar; do
+    CLASSPATH=$CLASSPATH:$j
+done
+
+tso() {
+    exec java $JVM_FLAGS -cp $CLASSPATH org.apache.omid.tso.TSOServer $@
+}
+
+tsoRelauncher() {
+    until ./omid.sh tso $@; do
+        echo "TSO Server crashed with exit code $?.  Re-launching..." >&2
+        sleep 1
+    done
+}
+
+createHBaseCommitTable() {
+    java -cp $CLASSPATH org.apache.omid.tools.hbase.OmidTableManager commit-table $@
+}
+
+createHBaseTimestampTable() {
+    java -cp $CLASSPATH org.apache.omid.tools.hbase.OmidTableManager timestamp-table $@
+}
+
+usage() {
+    echo "Usage: omid.sh <command> <options>"
+    echo "where <command> is one of:"
+    echo "  tso                           Starts The Status Oracle server (TSO)"
+    echo "  tso-relauncher                Starts The Status Oracle server (TSO) re-launching it if the process exits"
+    echo "  create-hbase-commit-table     Creates the hbase commit table."
+    echo "  create-hbase-timestamp-table  Creates the hbase timestamp table."
+}
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+    usage;
+    exit 1
+fi
+
+COMMAND=$1
+shift
+
+if [ "$COMMAND" = "tso" ]; then
+    createHBaseTimestampTable $@;
+    createHBaseCommitTable $@;
+    tso $@;
+elif [ "$COMMAND" = "tso-relauncher" ]; then
+    tsoRelauncher $@;
+elif [ "$COMMAND" = "create-hbase-commit-table" ]; then
+    createHBaseCommitTable $@;
+elif [ "$COMMAND" = "create-hbase-timestamp-table" ]; then
+    createHBaseTimestampTable $@;
+else
+    exec java -cp $CLASSPATH $COMMAND $@
+fi
+
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml
index 9fbd5c7..8079f33 100644
--- a/phoenix-assembly/pom.xml
+++ b/phoenix-assembly/pom.xml
@@ -130,5 +130,10 @@
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix-load-balancer</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.omid</groupId>
+      <artifactId>omid-hbase-tools-hbase1.x</artifactId>
+      <version>${omid.version}</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-assembly/src/build/components/all-common-dependencies.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/components/all-common-dependencies.xml b/phoenix-assembly/src/build/components/all-common-dependencies.xml
index 336bc4d..84388cd 100644
--- a/phoenix-assembly/src/build/components/all-common-dependencies.xml
+++ b/phoenix-assembly/src/build/components/all-common-dependencies.xml
@@ -45,6 +45,34 @@
         <include>io.netty:netty</include>
         <include>commons-codec:commons-codec</include>
         <include>org.apache.calcite:calcite-avatica*</include>
+
+        <!-- For omid TSO -->
+        <include>org.apache.omid:omid-tso-server-hbase1.x</include>
+        <include>org.apache.omid:omid-hbase-common-hbase1.x</include>
+        <include>org.apache.omid:omid-hbase-tools-hbase1.x</include>
+        <include>org.apache.omid:omid-common</include>
+        <include>org.apache.omid:omid-metrics</include>
+        <include>org.apache.omid:omid-timestamp-storage-hbase1.x</include>
+        <include>org.apache.omid:omid-hbase-shims-hbase1.x</include>
+        <include>org.apache.omid:omid-commit-table</include>
+        <include>org.apache.omid:omid-codahale-metrics</include>
+        <include>org.apache.omid:omid-hbase-commit-table-hbase1.x</include>
+        <include>org.yaml:snakeyaml</include>
+        <include>com.google.inject:guice</include>
+        <include>commons-beanutils:commons-beanutils</include>
+        <include>javax.inject:javax.inject</include>
+        <include>aopalliance:aopalliance</include>
+        <include>org.apache.commons:commons-pool2</include>
+        <include>com.lmax:disruptor</include>
+        <include>com.codahale.metrics:metrics-core</include>
+        <include>com.beust:jcommander</include>
+        <include>commons-collections:commons-collections</include>
+        <include>io.netty:netty-all</include>
+        <include>org.apache.htrace:htrace-core</include>
+        <include>javax.servlet:javax.servlet-api</include>
+        <include>commons-cli:commons-cli</include>
+        <include>com.yammer.metrics:metrics-core</include>
+        <include>com.codahale.metrics:metrics-graphite</include>
       </includes>
     </dependencySet>
     <!-- Separate dependency set to just pull in the jackson stuff since its test 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 196dadd..e54c98b 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -195,6 +195,52 @@
   </build>
 
   <dependencies>
+  <dependency>
+    <groupId>org.apache.omid</groupId>
+    <artifactId>omid-hbase-client-hbase1.x</artifactId>
+    <version>${omid.version}</version>
+    <exclusions>
+      <exclusion>
+        <groupId>org.testng</groupId>
+        <artifactId>testng</artifactId>
+      </exclusion>
+    </exclusions>
+  </dependency>
+  <dependency>
+    <groupId>org.apache.omid</groupId>
+    <artifactId>omid-hbase-coprocessor-hbase1.x</artifactId>
+    <version>${omid.version}</version>
+    <exclusions>
+      <exclusion>
+        <groupId>org.testng</groupId>
+        <artifactId>testng</artifactId>
+      </exclusion>
+    </exclusions>
+  </dependency>
+  <dependency>
+    <groupId>org.apache.omid</groupId>
+    <artifactId>omid-tso-server-hbase1.x</artifactId>
+    <version>${omid.version}</version>
+    <exclusions>
+      <exclusion>
+        <groupId>org.testng</groupId>
+        <artifactId>testng</artifactId>
+      </exclusion>
+    </exclusions>
+  </dependency>
+  <dependency>
+    <groupId>org.apache.omid</groupId>
+    <artifactId>omid-tso-server-hbase1.x</artifactId>
+    <version>${omid.version}</version>
+    <type>test-jar</type>
+    <exclusions>
+      <exclusion>
+        <groupId>org.testng</groupId>
+        <artifactId>testng</artifactId>
+      </exclusion>
+    </exclusions>
+  </dependency>
+
     <!-- Transaction dependencies -->
     <dependency>
       <groupId>org.apache.tephra</groupId>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
index 70658fb..b4a1a0f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
@@ -17,13 +17,15 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.transaction.OmidCompactor;
+import org.apache.omid.transaction.OmidSnapshotFilter;
 
 
 public class OmidGCProcessor extends DelegateRegionObserver {
 
     public OmidGCProcessor() {
-        super(new BaseRegionObserver());
+        super(new OmidCompactor(true));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
index fc246d4..b84b5ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
@@ -17,13 +17,16 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.omid.transaction.OmidSnapshotFilter;
+import org.apache.phoenix.transaction.OmidTransactionProvider;
 
 
 public class OmidTransactionalProcessor extends DelegateRegionObserver {
 
     public OmidTransactionalProcessor() {
-        super(new BaseRegionObserver());
+        // Hack for testing - retrieves the commit table client from the singleton OmidTransactionProvider
+        // TODO: use real commit table and get port from config
+        super(new OmidSnapshotFilter(OmidTransactionProvider.getInstance().getCommitTableClient()));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 9edc58b..392de78 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -18,89 +18,276 @@
 package org.apache.phoenix.transaction;
 
 import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.omid.proto.TSOProto;
+import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
+import org.apache.omid.transaction.HBaseCellId;
+import org.apache.omid.transaction.HBaseTransaction;
+import org.apache.omid.transaction.HBaseTransactionManager;
+import org.apache.omid.transaction.RollbackException;
+import org.apache.omid.transaction.Transaction;
+import org.apache.omid.transaction.Transaction.Status;
+import org.apache.omid.transaction.TransactionException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.InvalidProtocolBufferException;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-//import org.apache.omid.tso.TSOMockModule;
 
 public class OmidTransactionContext implements PhoenixTransactionContext {
 
+    private static final Logger logger = LoggerFactory.getLogger(OmidTransactionContext.class);
+
+    private HBaseTransactionManager tm;
+    private HBaseTransaction tx;
+
     public OmidTransactionContext() {
+        this.tx = null;
+        this.tm = null;
     }
 
     public OmidTransactionContext(PhoenixConnection connection) throws SQLException {
+        PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider());
+        assert (client instanceof OmidTransactionProvider.OmidTransactionClient);
+        this.tm = ((OmidTransactionProvider.OmidTransactionClient)client).getTransactionClient();
+        this.tx = null;
     }
 
     public OmidTransactionContext(byte[] txnBytes) throws InvalidProtocolBufferException {
+        this();
+        if (txnBytes != null && txnBytes.length > 0) {
+            TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(txnBytes);
+            tx = new HBaseTransaction(transaction.getTimestamp(), transaction.getEpoch(), new HashSet<HBaseCellId>(),
+                    new HashSet<HBaseCellId>(), null, tm.isLowLatency());
+        } else {
+            tx = null;
+        }
     }
 
     public OmidTransactionContext(PhoenixTransactionContext ctx, boolean subTask) {
+        assert (ctx instanceof OmidTransactionContext);
+        OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx;
+
+        this.tm = omidTransactionContext.tm;
+        
+        if (subTask) {
+            if (omidTransactionContext.isTransactionRunning()) {
+                Transaction transaction = omidTransactionContext.getTransaction();
+                this.tx = new HBaseTransaction(transaction.getTransactionId(), transaction.getEpoch(),
+                        new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), this.tm,
+                        transaction.getReadTimestamp(), transaction.getWriteTimestamp(), tm.isLowLatency());
+            } else {
+                this.tx = null;
+            }
+
+            this.tm = null;
+        } else {
+            this.tx = omidTransactionContext.getTransaction();
+        }
     }
 
     @Override
     public void begin() throws SQLException {
+        if (tm == null) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build()
+                    .buildException();
+        }
+
+
+        try {
+            tx = (HBaseTransaction) tm.begin();
+        } catch (TransactionException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
     }
 
     @Override
     public void commit() throws SQLException {
+        if (tx == null || tm == null)
+            return;
+
+        try {
+            tm.commit(tx);
+        } catch (TransactionException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        } catch (RollbackException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
     }
 
     @Override
     public void abort() throws SQLException {
+        if (tx == null || tm == null || tx.getStatus() != Status.RUNNING) {
+            return;
+        }
+
+        try {
+            tm.rollback(tx);
+        } catch (TransactionException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
     }
 
     @Override
     public void checkpoint(boolean hasUncommittedData) throws SQLException {
+        if (hasUncommittedData) {
+            try {
+                tx.checkpoint();
+            } catch (TransactionException e) {
+                throw new SQLException(e);
+            }
+        }
+        tx.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
     }
 
     @Override
     public void commitDDLFence(PTable dataTable) throws SQLException {
+
+        try {
+            tx = (HBaseTransaction) tm.fence(dataTable.getName().getBytes());
+            if (logger.isInfoEnabled()) {
+                logger.info("Added write fence at ~"
+                        + tx.getReadTimestamp());
+            }
+        } catch (TransactionException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
+            .setSchemaName(dataTable.getSchemaName().getString())
+            .setTableName(dataTable.getTableName().getString()).build()
+            .buildException();
+        }
     }
 
     @Override
     public void join(PhoenixTransactionContext ctx) {
+
+        if (ctx == PhoenixTransactionContext.NULL_CONTEXT) {
+            return;
+        }
+
+        assert (ctx instanceof OmidTransactionContext);
+        OmidTransactionContext omidContext = (OmidTransactionContext) ctx;
+        
+        HBaseTransaction transaction = omidContext.getTransaction();
+        if (transaction == null || tx == null) return;
+
+        Set<HBaseCellId> writeSet = transaction.getWriteSet();
+
+        for (HBaseCellId cell : writeSet) {
+            tx.addWriteSetElement(cell);
+        }
     }
 
     @Override
     public boolean isTransactionRunning() {
-        return false;
+        return (tx != null);
     }
 
     @Override
     public void reset() {
+        tx = null;
     }
 
     @Override
     public long getTransactionId() {
-        return 0;
+        return tx.getTransactionId();
     }
 
     @Override
     public long getReadPointer() {
-        return 0;
+        return tx.getReadTimestamp();
     }
 
     @Override
     public long getWritePointer() {
-        return 0;
+        return tx.getWriteTimestamp();
     }
 
     @Override
     public PhoenixVisibilityLevel getVisibilityLevel() {
-        return null;
+        VisibilityLevel visibilityLevel = null;
+
+        assert(tx != null);
+        visibilityLevel = tx.getVisibilityLevel();
+
+        PhoenixVisibilityLevel phoenixVisibilityLevel;
+        switch (visibilityLevel) {
+        case SNAPSHOT:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT;
+            break;
+        case SNAPSHOT_EXCLUDE_CURRENT:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+            break;
+        case SNAPSHOT_ALL:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL;
+        default:
+            phoenixVisibilityLevel = null;
+        }
+
+        return phoenixVisibilityLevel;
     }
 
     @Override
     public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
+
+        VisibilityLevel omidVisibilityLevel = null;
+
+        switch (visibilityLevel) {
+        case SNAPSHOT:
+            omidVisibilityLevel = VisibilityLevel.SNAPSHOT;
+            break;
+        case SNAPSHOT_EXCLUDE_CURRENT:
+            omidVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+            break;
+        case SNAPSHOT_ALL:
+            omidVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
+            break;
+        default:
+            assert (false);
+        }
+
+        assert(tx != null);
+        tx.setVisibilityLevel(omidVisibilityLevel);
+
     }
 
     @Override
     public byte[] encodeTransaction() throws SQLException {
-        return null;
+        assert(tx != null);
+
+        TSOProto.Transaction.Builder transactionBuilder = TSOProto.Transaction.newBuilder();
+
+        transactionBuilder.setTimestamp(tx.getTransactionId());
+        transactionBuilder.setEpoch(tx.getEpoch());
+
+        byte[] encodedTxBytes = transactionBuilder.build().toByteArray();
+        // Add code of TransactionProvider at end of byte array
+        encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 1);
+        encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode();
+        return encodedTxBytes;
     }
 
     @Override
@@ -110,13 +297,22 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
 
     @Override
     public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) {
-        return null;
+        return new OmidTransactionContext(context, subTask);
     }
 
     @Override
     public void markDMLFence(PTable dataTable) {
     }
 
+    /**
+    *  OmidTransactionContext specific functions
+    */
+
+    public HBaseTransaction getTransaction() {
+        return tx;
+    }
+
+
     @Override
     public Table getTransactionalTable(Table htable, boolean isConflictFree) throws SQLException {
         return new OmidTransactionTable(this, htable, isConflictFree);
@@ -124,6 +320,9 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
 
     @Override
     public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) throws SQLException {
-        return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex);
+        // When we're getting a table for writing, if the table being written to is an index,
+        // write the shadow cells immediately since the only time we write to an index is
+        // when we initially populate it synchronously.
+        return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex, isIndex);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
index bace2bc..87d7225 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -24,11 +24,27 @@ import java.util.Arrays;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.transaction.HBaseOmidClientConfiguration;
+import org.apache.omid.transaction.HBaseTransactionManager;
+import org.apache.omid.transaction.TTable;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.apache.omid.tso.client.TSOClient;
 import org.apache.phoenix.coprocessor.OmidGCProcessor;
 import org.apache.phoenix.coprocessor.OmidTransactionalProcessor;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.phoenix.util.TransactionUtil;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
 
 public class OmidTransactionProvider implements PhoenixTransactionProvider {
     private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();
@@ -38,6 +54,10 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider {
     public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000;
     public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME";
 
+    private HBaseTransactionManager transactionManager = null;
+    private volatile CommitTable.Client commitTableClient = null;
+    private CommitTable.Writer commitTableWriter = null;
+
     public static final OmidTransactionProvider getInstance() {
         return INSTANCE;
     }
@@ -63,26 +83,106 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider {
 
     @Override
     public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) throws SQLException{
-        return new OmidTransactionClient();
+        if (transactionManager == null) {
+            try {
+                HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
+                clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+                transactionManager = (HBaseTransactionManager) HBaseTransactionManager.newInstance(clientConf);
+            } catch (IOException | InterruptedException e) {
+                throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.TRANSACTION_FAILED)
+                        .setMessage(e.getMessage()).setRootCause(e).build()
+                        .buildException();
+            }
+        }
+
+        return new OmidTransactionClient(transactionManager);
     }
 
     static class OmidTransactionClient implements PhoenixTransactionClient {
+        private final HBaseTransactionManager transactionManager;
+
+        public OmidTransactionClient(HBaseTransactionManager transactionManager) {
+            this.transactionManager = transactionManager;
+        }
+
+        public HBaseTransactionManager getTransactionClient() {
+            return transactionManager;
+        }
+
         @Override
         public void close() throws IOException {}
     }
 
+    // For testing only
+    public CommitTable.Client getCommitTableClient() {
+        return commitTableClient;
+    }
+    
     @Override
     public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws  SQLException{
-        return new OmidTransactionService();
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        TSOServer tso;
+
+        tsoConfig.setPort(port);
+        tsoConfig.setConflictMapSize(config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE));
+        tsoConfig.setTimestampType(config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE));
+
+        Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        tso = injector.getInstance(TSOServer.class);
+        tso.startAndWait();
+
+        OmidClientConfiguration clientConfig = new OmidClientConfiguration();
+        clientConfig.setConnectionString("localhost:" + port);
+        clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+
+        InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
+
+        try {
+            // Create the associated Handler
+            TSOClient client = TSOClient.newInstance(clientConfig);
+
+            HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
+            clientConf.setConnectionString("localhost:" + port);
+            clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+            clientConf.setHBaseConfiguration(config);
+            commitTableClient = commitTable.getClient();
+            commitTableWriter = commitTable.getWriter();
+            transactionManager = HBaseTransactionManager.builder(clientConf)
+                    .commitTableClient(commitTableClient)
+                    .commitTableWriter(commitTableWriter)
+                    .tsoClient(client).build();
+        } catch (IOException | InterruptedException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
+
+        return new OmidTransactionService(tso, transactionManager);
     }
 
     static class OmidTransactionService implements PhoenixTransactionService {
+        private final HBaseTransactionManager transactionManager;
+        private TSOServer tso;
+
+        public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) {
+            this.tso = tso;
+            this.transactionManager = transactionManager;
+        }
 
         public void start() {
+
         }
 
         @Override
         public void close() throws IOException {
+            if (transactionManager != null) {
+                transactionManager.close();
+            }
+            if (tso != null) {
+                tso.stopAndWait();
+            }
         }
     }
 
@@ -108,6 +208,6 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider {
 
     @Override
     public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) {
-        return put;
+        return TTable.markPutAsCommitted(put, timestamp, timestamp);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
index a2afcc9..cec7f59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.transaction;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -41,6 +42,10 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.omid.transaction.TTable;
+import org.apache.omid.transaction.Transaction;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
@@ -48,80 +53,115 @@ import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
 
 public class OmidTransactionTable implements Table {
+    // Copied from HBase ProtobufUtil since it's not accessible
+    final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
+
+    private TTable tTable;
+    private Transaction tx;
+    private final boolean addShadowCells;
 
     public OmidTransactionTable() throws SQLException {
+        this.tTable = null;
+        this.tx = null;
+        this.addShadowCells = false;
     }
 
     public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable) throws SQLException {
+        this(ctx, hTable, false);
     }
 
     public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, boolean isConflictFree) throws SQLException  {
+        this(ctx, hTable, isConflictFree, false);
+    }
+    
+    public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, boolean isConflictFree, boolean addShadowCells) throws SQLException  {
+        assert(ctx instanceof OmidTransactionContext);
+
+        OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx;
+        this.addShadowCells = addShadowCells;
+        try {
+            tTable = new TTable(hTable, true, isConflictFree);
+        } catch (IOException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+            .setMessage(e.getMessage()).setRootCause(e).build()
+            .buildException();
+        }
+
+        this.tx = omidTransactionContext.getTransaction();
     }
 
     @Override
     public Result get(Get get) throws IOException {
-        return null;
+        return tTable.get(tx, get);
     }
 
     @Override
     public void put(Put put) throws IOException {
+        tTable.put(tx, put, addShadowCells);
     }
 
     @Override
     public void delete(Delete delete) throws IOException {
+        tTable.delete(tx, delete);
     }
 
     @Override
     public ResultScanner getScanner(Scan scan) throws IOException {
-        return null;
+        scan.setTimeRange(0, Long.MAX_VALUE);
+        return tTable.getScanner(tx, scan);
     }
 
     @Override
     public Configuration getConfiguration() {
-        return null;
+        return tTable.getConfiguration();
     }
 
     @Override
     public HTableDescriptor getTableDescriptor() throws IOException {
-        return null;
+        return tTable.getTableDescriptor();
     }
 
     @Override
     public boolean exists(Get get) throws IOException {
-        return false;
+       return tTable.exists(tx, get);
     }
 
     @Override
     public Result[] get(List<Get> gets) throws IOException {
-        return null;
+        return tTable.get(tx, gets);
     }
 
     @Override
     public ResultScanner getScanner(byte[] family) throws IOException {
-        return null;
+        return tTable.getScanner(tx, family);
     }
 
     @Override
     public ResultScanner getScanner(byte[] family, byte[] qualifier)
             throws IOException {
-        return null;
+        return tTable.getScanner(tx, family, qualifier);
     }
 
     @Override
     public void put(List<Put> puts) throws IOException {
+        tTable.put(tx, puts, addShadowCells);
     }
 
     @Override
     public void delete(List<Delete> deletes) throws IOException {
+        tTable.delete(tx, deletes);
     }
 
     @Override
     public void close() throws IOException {
+        tTable.close();
     }
 
     @Override
     public TableName getName() {
-        return null;
+        byte[] name = tTable.getTableName();
+        return TableName.valueOf(name);
     }
 
     @Override
@@ -132,12 +172,16 @@ public class OmidTransactionTable implements Table {
     @Override
     public void batch(List<? extends Row> actions, Object[] results)
             throws IOException, InterruptedException {
+        tTable.batch(tx, actions, addShadowCells);
+        Arrays.fill(results, EMPTY_RESULT_EXISTS_TRUE);
     }
 
     @Override
     public Object[] batch(List<? extends Row> actions) throws IOException,
             InterruptedException {
-        return null;
+        Object[] results;
+        batch(actions, results = new Object[actions.size()]);
+        return results;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index d1d531a..0f10b37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -26,7 +26,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol;
 public class TransactionFactory {
     public enum Provider {
         TEPHRA((byte)1, TephraTransactionProvider.getInstance(), true),
-        OMID((byte)2, OmidTransactionProvider.getInstance(), false);
+        OMID((byte)2, OmidTransactionProvider.getInstance(), true);
         
         private final byte code;
         private final PhoenixTransactionProvider provider;
@@ -50,7 +50,7 @@ public class TransactionFactory {
         }
         
         public static Provider getDefault() {
-            return TEPHRA;
+            return OMID;
         }
 
         public PhoenixTransactionProvider getTransactionProvider()  {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 2049390..59e7fd3 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -22,7 +22,6 @@ import static org.apache.phoenix.query.QueryServicesOptions.withDefaults;
 
 import org.apache.curator.shaded.com.google.common.io.Files;
 import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
-import org.apache.phoenix.transaction.OmidTransactionProvider;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index f3faa0c..d2649e1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1106,7 +1106,7 @@ public class TestUtil {
         }
         return filteredData;
     }
-    
+
     /**
      * Find a random free port in localhost for binding.
      * @return A port number or -1 for failure.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-server/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-server/pom.xml b/phoenix-server/pom.xml
index 186d7a0..b4a90e9 100644
--- a/phoenix-server/pom.xml
+++ b/phoenix-server/pom.xml
@@ -122,6 +122,7 @@
                   <include>org.iq80.snappy:snappy</include>
                   <include>org.antlr:antlr*</include>
                   <include>org.apache.tephra:tephra*</include>
+                  <include>org.apache.omid:omid*</include>
                   <include>com.google.code.gson:gson</include>
                   <include>org.jruby.joni:joni</include>
                   <include>org.jruby.jcodings:jcodings</include>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fc64bd2..e817200 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,7 @@
     <avatica.version>1.12.0</avatica.version>
     <jettyVersion>8.1.7.v20120910</jettyVersion>
     <tephra.version>0.15.0-incubating</tephra.version>
+    <omid.version>1.0.0</omid.version>
     <spark.version>2.3.2</spark.version>
     <scala.version>2.11.8</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
@@ -804,6 +805,52 @@
         <artifactId>tephra-hbase-compat-1.3</artifactId>
         <version>${tephra.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.omid</groupId>
+        <artifactId>omid-hbase-client-hbase1.x</artifactId>
+        <version>${omid.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.omid</groupId>
+        <artifactId>omid-hbase-coprocessor-hbase1.x</artifactId>
+        <version>${omid.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.omid</groupId>
+        <artifactId>omid-tso-server-hbase1.x</artifactId>
+        <version>${omid.version}</version>
+        <exclusions>
+          <exclusion>
+          <groupId>org.testng</groupId>
+          <artifactId>testng</artifactId>
+        </exclusion>
+      </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.omid</groupId>
+        <artifactId>omid-tso-server-hbase1.x</artifactId>
+        <version>${omid.version}</version>
+        <type>test-jar</type>
+        <exclusions>
+          <exclusion>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
 
       <!-- Make sure we have all the antlr dependencies -->
       <dependency>