You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by oh...@apache.org on 2018/12/20 11:52:31 UTC
phoenix git commit: [PHOENIX-3623] Integrate Omid with Phoenix.
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.3 4975f0b12 -> 850114334
[PHOENIX-3623] Integrate Omid with Phoenix.
This commit finishes the integration of Omid as Phoenix transaction processor engine.
More information regarding the integration exists at [PHOENIX-3623] and at [OMID-82], which is the corresponding jira in Omid.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/85011433
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/85011433
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/85011433
Branch: refs/heads/4.x-HBase-1.3
Commit: 8501143345b291765fcd489fbfba7d4b6be24e01
Parents: 4975f0b
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Thu Dec 20 13:47:38 2018 +0200
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Thu Dec 20 13:47:38 2018 +0200
----------------------------------------------------------------------
bin/omid-env.sh | 43 ++++
bin/omid-server-configuration.yml | 25 +++
bin/omid.sh | 93 ++++++++
phoenix-assembly/pom.xml | 5 +
.../components/all-common-dependencies.xml | 28 +++
phoenix-core/pom.xml | 46 ++++
.../phoenix/coprocessor/OmidGCProcessor.java | 6 +-
.../coprocessor/OmidTransactionalProcessor.java | 7 +-
.../transaction/OmidTransactionContext.java | 217 ++++++++++++++++++-
.../transaction/OmidTransactionProvider.java | 106 ++++++++-
.../transaction/OmidTransactionTable.java | 64 +++++-
.../phoenix/transaction/TransactionFactory.java | 4 +-
.../phoenix/query/QueryServicesTestImpl.java | 1 -
.../java/org/apache/phoenix/util/TestUtil.java | 2 +-
phoenix-server/pom.xml | 1 +
pom.xml | 47 ++++
16 files changed, 665 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/bin/omid-env.sh
----------------------------------------------------------------------
diff --git a/bin/omid-env.sh b/bin/omid-env.sh
new file mode 100644
index 0000000..820cdaa
--- /dev/null
+++ b/bin/omid-env.sh
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set the flags to pass to the jvm when running omid
+# export JVM_FLAGS=-Xmx8096m
+# ---------------------------------------------------------------------------------------------------------------------
+# Check if HADOOP_CONF_DIR and HBASE_CONF_DIR are set
+# ---------------------------------------------------------------------------------------------------------------------
+export JVM_FLAGS=-Xmx4096m
+if [ -z ${HADOOP_CONF_DIR+x} ]; then
+ if [ -z ${HADOOP_HOME+x} ]; then
+ echo "WARNING: HADOOP_HOME or HADOOP_CONF_DIR are unset";
+ else
+ export HADOOP_CONF_DIR=${HADOOP_HOME}/conf
+ fi
+else
+ echo "HADOOP_CONF_DIR is set to '$HADOOP_CONF_DIR'";
+fi
+
+if [ -z ${HBASE_CONF_DIR+x} ]; then
+ if [ -z ${HBASE_HOME+x} ]; then
+ echo "WARNING: HBASE_HOME or HBASE_CONF_DIR are unset";
+ else
+ export HBASE_CONF_DIR=${HBASE_HOME}/conf
+ fi
+else
+ echo "HBASE_CONF_DIR is set to '$HBASE_CONF_DIR'";
+fi
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/bin/omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/bin/omid-server-configuration.yml b/bin/omid-server-configuration.yml
new file mode 100644
index 0000000..8d1616e
--- /dev/null
+++ b/bin/omid-server-configuration.yml
@@ -0,0 +1,25 @@
+# =====================================================================================================================
+#
+# Omid TSO Server Configuration
+# ---------------------------------------------------------------------------------------------------------------------
+#
+# Tune here the default values for TSO server config parameters found in 'default-omid-server-configuration.yml' file
+#
+# =====================================================================================================================
+
+
+timestampStoreModule: !!org.apache.omid.timestamp.storage.HBaseTimestampStorageModule [ ]
+commitTableStoreModule: !!org.apache.omid.committable.hbase.HBaseCommitTableStorageModule [ ]
+
+metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [
+!!org.apache.omid.metrics.CodahaleMetricsConfig {
+ outputFreqInSecs: 10,
+ reporters: !!set {
+ !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CSV
+ },
+ csvDir: "csvMetrics",
+}
+]
+
+timestampType: WORLD_TIME
+lowLatency: false
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/bin/omid.sh
----------------------------------------------------------------------
diff --git a/bin/omid.sh b/bin/omid.sh
new file mode 100755
index 0000000..5b33ed5
--- /dev/null
+++ b/bin/omid.sh
@@ -0,0 +1,93 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SCRIPTDIR=`dirname $0`
+cd $SCRIPTDIR;
+
+# Load Omid environment variables
+source omid-env.sh
+
+# Configure classpath...
+CLASSPATH=./:../conf:${HBASE_CONF_DIR}:${HADOOP_CONF_DIR}
+
+# ...for source release and...
+for j in ../target/omid-tso*.jar; do
+ CLASSPATH=$CLASSPATH:$j
+done
+
+# and for binary release
+for j in ../omid-tso*.jar; do
+ CLASSPATH=$CLASSPATH:$j
+done
+for j in ../lib/*.jar; do
+ CLASSPATH=$CLASSPATH:$j
+done
+
+tso() {
+ exec java $JVM_FLAGS -cp $CLASSPATH org.apache.omid.tso.TSOServer $@
+}
+
+tsoRelauncher() {
+ until ./omid.sh tso $@; do
+ echo "TSO Server crashed with exit code $?. Re-launching..." >&2
+ sleep 1
+ done
+}
+
+createHBaseCommitTable() {
+ java -cp $CLASSPATH org.apache.omid.tools.hbase.OmidTableManager commit-table $@
+}
+
+createHBaseTimestampTable() {
+ java -cp $CLASSPATH org.apache.omid.tools.hbase.OmidTableManager timestamp-table $@
+}
+
+usage() {
+ echo "Usage: omid.sh <command> <options>"
+ echo "where <command> is one of:"
+ echo " tso Starts The Status Oracle server (TSO)"
+ echo " tso-relauncher Starts The Status Oracle server (TSO) re-launching it if the process exits"
+ echo " create-hbase-commit-table Creates the hbase commit table."
+ echo " create-hbase-timestamp-table Creates the hbase timestamp table."
+}
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+ usage;
+ exit 1
+fi
+
+COMMAND=$1
+shift
+
+if [ "$COMMAND" = "tso" ]; then
+ createHBaseTimestampTable $@;
+ createHBaseCommitTable $@;
+ tso $@;
+elif [ "$COMMAND" = "tso-relauncher" ]; then
+ tsoRelauncher $@;
+elif [ "$COMMAND" = "create-hbase-commit-table" ]; then
+ createHBaseCommitTable $@;
+elif [ "$COMMAND" = "create-hbase-timestamp-table" ]; then
+ createHBaseTimestampTable $@;
+else
+ exec java -cp $CLASSPATH $COMMAND $@
+fi
+
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml
index 9fbd5c7..8079f33 100644
--- a/phoenix-assembly/pom.xml
+++ b/phoenix-assembly/pom.xml
@@ -130,5 +130,10 @@
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-load-balancer</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-hbase-tools-hbase1.x</artifactId>
+ <version>${omid.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-assembly/src/build/components/all-common-dependencies.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/components/all-common-dependencies.xml b/phoenix-assembly/src/build/components/all-common-dependencies.xml
index 336bc4d..84388cd 100644
--- a/phoenix-assembly/src/build/components/all-common-dependencies.xml
+++ b/phoenix-assembly/src/build/components/all-common-dependencies.xml
@@ -45,6 +45,34 @@
<include>io.netty:netty</include>
<include>commons-codec:commons-codec</include>
<include>org.apache.calcite:calcite-avatica*</include>
+
+ <!-- For omid TSO -->
+ <include>org.apache.omid:omid-tso-server-hbase1.x</include>
+ <include>org.apache.omid:omid-hbase-common-hbase1.x</include>
+ <include>org.apache.omid:omid-hbase-tools-hbase1.x</include>
+ <include>org.apache.omid:omid-common</include>
+ <include>org.apache.omid:omid-metrics</include>
+ <include>org.apache.omid:omid-timestamp-storage-hbase1.x</include>
+ <include>org.apache.omid:omid-hbase-shims-hbase1.x</include>
+ <include>org.apache.omid:omid-commit-table</include>
+ <include>org.apache.omid:omid-codahale-metrics</include>
+ <include>org.apache.omid:omid-hbase-commit-table-hbase1.x</include>
+ <include>org.yaml:snakeyaml</include>
+ <include>com.google.inject:guice</include>
+ <include>commons-beanutils:commons-beanutils</include>
+ <include>javax.inject:javax.inject</include>
+ <include>aopalliance:aopalliance</include>
+ <include>org.apache.commons:commons-pool2</include>
+ <include>com.lmax:disruptor</include>
+ <include>com.codahale.metrics:metrics-core</include>
+ <include>com.beust:jcommander</include>
+ <include>commons-collections:commons-collections</include>
+ <include>io.netty:netty-all</include>
+ <include>org.apache.htrace:htrace-core</include>
+ <include>javax.servlet:javax.servlet-api</include>
+ <include>commons-cli:commons-cli</include>
+ <include>com.yammer.metrics:metrics-core</include>
+ <include>com.codahale.metrics:metrics-graphite</include>
</includes>
</dependencySet>
<!-- Separate dependency set to just pull in the jackson stuff since its test
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 196dadd..e54c98b 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -195,6 +195,52 @@
</build>
<dependencies>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-hbase-client-hbase1.x</artifactId>
+ <version>${omid.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-hbase-coprocessor-hbase1.x</artifactId>
+ <version>${omid.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-tso-server-hbase1.x</artifactId>
+ <version>${omid.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-tso-server-hbase1.x</artifactId>
+ <version>${omid.version}</version>
+ <type>test-jar</type>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- Transaction dependencies -->
<dependency>
<groupId>org.apache.tephra</groupId>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
index 70658fb..b4a1a0f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
@@ -17,13 +17,15 @@
*/
package org.apache.phoenix.coprocessor;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.transaction.OmidCompactor;
+import org.apache.omid.transaction.OmidSnapshotFilter;
public class OmidGCProcessor extends DelegateRegionObserver {
public OmidGCProcessor() {
- super(new BaseRegionObserver());
+ super(new OmidCompactor(true));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
index fc246d4..b84b5ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
@@ -17,13 +17,16 @@
*/
package org.apache.phoenix.coprocessor;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.omid.transaction.OmidSnapshotFilter;
+import org.apache.phoenix.transaction.OmidTransactionProvider;
public class OmidTransactionalProcessor extends DelegateRegionObserver {
public OmidTransactionalProcessor() {
- super(new BaseRegionObserver());
+ // Hack for testing - retrieves the commit table client from the singleton OmidTransactionProvider
+ // TODO: use real commit table and get port from config
+ super(new OmidSnapshotFilter(OmidTransactionProvider.getInstance().getCommitTableClient()));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 9edc58b..392de78 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -18,89 +18,276 @@
package org.apache.phoenix.transaction;
import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.omid.proto.TSOProto;
+import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
+import org.apache.omid.transaction.HBaseCellId;
+import org.apache.omid.transaction.HBaseTransaction;
+import org.apache.omid.transaction.HBaseTransactionManager;
+import org.apache.omid.transaction.RollbackException;
+import org.apache.omid.transaction.Transaction;
+import org.apache.omid.transaction.Transaction.Status;
+import org.apache.omid.transaction.TransactionException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.InvalidProtocolBufferException;
-//import org.apache.omid.tso.TSOMockModule;
public class OmidTransactionContext implements PhoenixTransactionContext {
+ private static final Logger logger = LoggerFactory.getLogger(OmidTransactionContext.class);
+
+ private HBaseTransactionManager tm;
+ private HBaseTransaction tx;
+
public OmidTransactionContext() {
+ this.tx = null;
+ this.tm = null;
}
public OmidTransactionContext(PhoenixConnection connection) throws SQLException {
+ PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider());
+ assert (client instanceof OmidTransactionProvider.OmidTransactionClient);
+ this.tm = ((OmidTransactionProvider.OmidTransactionClient)client).getTransactionClient();
+ this.tx = null;
}
public OmidTransactionContext(byte[] txnBytes) throws InvalidProtocolBufferException {
+ this();
+ if (txnBytes != null && txnBytes.length > 0) {
+ TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(txnBytes);
+ tx = new HBaseTransaction(transaction.getTimestamp(), transaction.getEpoch(), new HashSet<HBaseCellId>(),
+ new HashSet<HBaseCellId>(), null, tm.isLowLatency());
+ } else {
+ tx = null;
+ }
}
public OmidTransactionContext(PhoenixTransactionContext ctx, boolean subTask) {
+ assert (ctx instanceof OmidTransactionContext);
+ OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx;
+
+ this.tm = omidTransactionContext.tm;
+
+ if (subTask) {
+ if (omidTransactionContext.isTransactionRunning()) {
+ Transaction transaction = omidTransactionContext.getTransaction();
+ this.tx = new HBaseTransaction(transaction.getTransactionId(), transaction.getEpoch(),
+ new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), this.tm,
+ transaction.getReadTimestamp(), transaction.getWriteTimestamp(), tm.isLowLatency());
+ } else {
+ this.tx = null;
+ }
+
+ this.tm = null;
+ } else {
+ this.tx = omidTransactionContext.getTransaction();
+ }
}
@Override
public void begin() throws SQLException {
+ if (tm == null) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build()
+ .buildException();
+ }
+
+
+ try {
+ tx = (HBaseTransaction) tm.begin();
+ } catch (TransactionException e) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage()).setRootCause(e).build()
+ .buildException();
+ }
}
@Override
public void commit() throws SQLException {
+ if (tx == null || tm == null)
+ return;
+
+ try {
+ tm.commit(tx);
+ } catch (TransactionException e) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage()).setRootCause(e).build()
+ .buildException();
+ } catch (RollbackException e) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
+ .setMessage(e.getMessage()).setRootCause(e).build()
+ .buildException();
+ }
}
@Override
public void abort() throws SQLException {
+ if (tx == null || tm == null || tx.getStatus() != Status.RUNNING) {
+ return;
+ }
+
+ try {
+ tm.rollback(tx);
+ } catch (TransactionException e) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage()).setRootCause(e).build()
+ .buildException();
+ }
}
@Override
public void checkpoint(boolean hasUncommittedData) throws SQLException {
+ if (hasUncommittedData) {
+ try {
+ tx.checkpoint();
+ } catch (TransactionException e) {
+ throw new SQLException(e);
+ }
+ }
+ tx.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
}
@Override
public void commitDDLFence(PTable dataTable) throws SQLException {
+
+ try {
+ tx = (HBaseTransaction) tm.fence(dataTable.getName().getBytes());
+ if (logger.isInfoEnabled()) {
+ logger.info("Added write fence at ~"
+ + tx.getReadTimestamp());
+ }
+ } catch (TransactionException e) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
+ .setSchemaName(dataTable.getSchemaName().getString())
+ .setTableName(dataTable.getTableName().getString()).build()
+ .buildException();
+ }
}
@Override
public void join(PhoenixTransactionContext ctx) {
+
+ if (ctx == PhoenixTransactionContext.NULL_CONTEXT) {
+ return;
+ }
+
+ assert (ctx instanceof OmidTransactionContext);
+ OmidTransactionContext omidContext = (OmidTransactionContext) ctx;
+
+ HBaseTransaction transaction = omidContext.getTransaction();
+ if (transaction == null || tx == null) return;
+
+ Set<HBaseCellId> writeSet = transaction.getWriteSet();
+
+ for (HBaseCellId cell : writeSet) {
+ tx.addWriteSetElement(cell);
+ }
}
@Override
public boolean isTransactionRunning() {
- return false;
+ return (tx != null);
}
@Override
public void reset() {
+ tx = null;
}
@Override
public long getTransactionId() {
- return 0;
+ return tx.getTransactionId();
}
@Override
public long getReadPointer() {
- return 0;
+ return tx.getReadTimestamp();
}
@Override
public long getWritePointer() {
- return 0;
+ return tx.getWriteTimestamp();
}
@Override
public PhoenixVisibilityLevel getVisibilityLevel() {
- return null;
+ VisibilityLevel visibilityLevel = null;
+
+ assert(tx != null);
+ visibilityLevel = tx.getVisibilityLevel();
+
+ PhoenixVisibilityLevel phoenixVisibilityLevel;
+ switch (visibilityLevel) {
+ case SNAPSHOT:
+ phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT;
+ break;
+ case SNAPSHOT_EXCLUDE_CURRENT:
+ phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+ break;
+ case SNAPSHOT_ALL:
+ phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL;
+ default:
+ phoenixVisibilityLevel = null;
+ }
+
+ return phoenixVisibilityLevel;
}
@Override
public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
+
+ VisibilityLevel omidVisibilityLevel = null;
+
+ switch (visibilityLevel) {
+ case SNAPSHOT:
+ omidVisibilityLevel = VisibilityLevel.SNAPSHOT;
+ break;
+ case SNAPSHOT_EXCLUDE_CURRENT:
+ omidVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+ break;
+ case SNAPSHOT_ALL:
+ omidVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
+ break;
+ default:
+ assert (false);
+ }
+
+ assert(tx != null);
+ tx.setVisibilityLevel(omidVisibilityLevel);
+
}
@Override
public byte[] encodeTransaction() throws SQLException {
- return null;
+ assert(tx != null);
+
+ TSOProto.Transaction.Builder transactionBuilder = TSOProto.Transaction.newBuilder();
+
+ transactionBuilder.setTimestamp(tx.getTransactionId());
+ transactionBuilder.setEpoch(tx.getEpoch());
+
+ byte[] encodedTxBytes = transactionBuilder.build().toByteArray();
+ // Add code of TransactionProvider at end of byte array
+ encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 1);
+ encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode();
+ return encodedTxBytes;
}
@Override
@@ -110,13 +297,22 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
@Override
public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) {
- return null;
+ return new OmidTransactionContext(context, subTask);
}
@Override
public void markDMLFence(PTable dataTable) {
}
+ /**
+ * OmidTransactionContext specific functions
+ */
+
+ public HBaseTransaction getTransaction() {
+ return tx;
+ }
+
+
@Override
public Table getTransactionalTable(Table htable, boolean isConflictFree) throws SQLException {
return new OmidTransactionTable(this, htable, isConflictFree);
@@ -124,6 +320,9 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
@Override
public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) throws SQLException {
- return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex);
+ // When we're getting a table for writing, if the table being written to is an index,
+ // write the shadow cells immediately since the only time we write to an index is
+ // when we initially populate it synchronously.
+ return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex, isIndex);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
index bace2bc..87d7225 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -24,11 +24,27 @@ import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.transaction.HBaseOmidClientConfiguration;
+import org.apache.omid.transaction.HBaseTransactionManager;
+import org.apache.omid.transaction.TTable;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.apache.omid.tso.client.TSOClient;
import org.apache.phoenix.coprocessor.OmidGCProcessor;
import org.apache.phoenix.coprocessor.OmidTransactionalProcessor;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.phoenix.util.TransactionUtil;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
public class OmidTransactionProvider implements PhoenixTransactionProvider {
private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();
@@ -38,6 +54,10 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider {
public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000;
public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME";
+ private HBaseTransactionManager transactionManager = null;
+ private volatile CommitTable.Client commitTableClient = null;
+ private CommitTable.Writer commitTableWriter = null;
+
public static final OmidTransactionProvider getInstance() {
return INSTANCE;
}
@@ -63,26 +83,106 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider {
@Override
public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) throws SQLException{
- return new OmidTransactionClient();
+ if (transactionManager == null) {
+ try {
+ HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
+ clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+ transactionManager = (HBaseTransactionManager) HBaseTransactionManager.newInstance(clientConf);
+ } catch (IOException | InterruptedException e) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage()).setRootCause(e).build()
+ .buildException();
+ }
+ }
+
+ return new OmidTransactionClient(transactionManager);
}
static class OmidTransactionClient implements PhoenixTransactionClient {
+ private final HBaseTransactionManager transactionManager;
+
+ public OmidTransactionClient(HBaseTransactionManager transactionManager) {
+ this.transactionManager = transactionManager;
+ }
+
+ public HBaseTransactionManager getTransactionClient() {
+ return transactionManager;
+ }
+
@Override
public void close() throws IOException {}
}
+ // For testing only
+ public CommitTable.Client getCommitTableClient() {
+ return commitTableClient;
+ }
+
@Override
public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws SQLException{
- return new OmidTransactionService();
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ TSOServer tso;
+
+ tsoConfig.setPort(port);
+ tsoConfig.setConflictMapSize(config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE));
+ tsoConfig.setTimestampType(config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE));
+
+ Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+ tso = injector.getInstance(TSOServer.class);
+ tso.startAndWait();
+
+ OmidClientConfiguration clientConfig = new OmidClientConfiguration();
+ clientConfig.setConnectionString("localhost:" + port);
+ clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+
+ InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
+
+ try {
+ // Create the associated Handler
+ TSOClient client = TSOClient.newInstance(clientConfig);
+
+ HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
+ clientConf.setConnectionString("localhost:" + port);
+ clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+ clientConf.setHBaseConfiguration(config);
+ commitTableClient = commitTable.getClient();
+ commitTableWriter = commitTable.getWriter();
+ transactionManager = HBaseTransactionManager.builder(clientConf)
+ .commitTableClient(commitTableClient)
+ .commitTableWriter(commitTableWriter)
+ .tsoClient(client).build();
+ } catch (IOException | InterruptedException e) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage()).setRootCause(e).build()
+ .buildException();
+ }
+
+ return new OmidTransactionService(tso, transactionManager);
}
static class OmidTransactionService implements PhoenixTransactionService {
+ private final HBaseTransactionManager transactionManager;
+ private TSOServer tso;
+
+ public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) {
+ this.tso = tso;
+ this.transactionManager = transactionManager;
+ }
public void start() {
+
}
@Override
public void close() throws IOException {
+ if (transactionManager != null) {
+ transactionManager.close();
+ }
+ if (tso != null) {
+ tso.stopAndWait();
+ }
}
}
@@ -108,6 +208,6 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider {
@Override
public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) {
- return put;
+ return TTable.markPutAsCommitted(put, timestamp, timestamp);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
index a2afcc9..cec7f59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.transaction;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -41,6 +42,10 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.omid.transaction.TTable;
+import org.apache.omid.transaction.Transaction;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
@@ -48,80 +53,115 @@ import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
public class OmidTransactionTable implements Table {
+ // Copied from HBase ProtobufUtil since it's not accessible
+ final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
+
+ private TTable tTable;
+ private Transaction tx;
+ private final boolean addShadowCells;
public OmidTransactionTable() throws SQLException {
+ this.tTable = null;
+ this.tx = null;
+ this.addShadowCells = false;
}
public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable) throws SQLException {
+ this(ctx, hTable, false);
}
public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, boolean isConflictFree) throws SQLException {
+ this(ctx, hTable, isConflictFree, false);
+ }
+
+ public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, boolean isConflictFree, boolean addShadowCells) throws SQLException {
+ assert(ctx instanceof OmidTransactionContext);
+
+ OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx;
+ this.addShadowCells = addShadowCells;
+ try {
+ tTable = new TTable(hTable, true, isConflictFree);
+ } catch (IOException e) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage()).setRootCause(e).build()
+ .buildException();
+ }
+
+ this.tx = omidTransactionContext.getTransaction();
}
@Override
public Result get(Get get) throws IOException {
- return null;
+ return tTable.get(tx, get);
}
@Override
public void put(Put put) throws IOException {
+ tTable.put(tx, put, addShadowCells);
}
@Override
public void delete(Delete delete) throws IOException {
+ tTable.delete(tx, delete);
}
@Override
public ResultScanner getScanner(Scan scan) throws IOException {
- return null;
+ scan.setTimeRange(0, Long.MAX_VALUE);
+ return tTable.getScanner(tx, scan);
}
@Override
public Configuration getConfiguration() {
- return null;
+ return tTable.getConfiguration();
}
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
- return null;
+ return tTable.getTableDescriptor();
}
@Override
public boolean exists(Get get) throws IOException {
- return false;
+ return tTable.exists(tx, get);
}
@Override
public Result[] get(List<Get> gets) throws IOException {
- return null;
+ return tTable.get(tx, gets);
}
@Override
public ResultScanner getScanner(byte[] family) throws IOException {
- return null;
+ return tTable.getScanner(tx, family);
}
@Override
public ResultScanner getScanner(byte[] family, byte[] qualifier)
throws IOException {
- return null;
+ return tTable.getScanner(tx, family, qualifier);
}
@Override
public void put(List<Put> puts) throws IOException {
+ tTable.put(tx, puts, addShadowCells);
}
@Override
public void delete(List<Delete> deletes) throws IOException {
+ tTable.delete(tx, deletes);
}
@Override
public void close() throws IOException {
+ tTable.close();
}
@Override
public TableName getName() {
- return null;
+ byte[] name = tTable.getTableName();
+ return TableName.valueOf(name);
}
@Override
@@ -132,12 +172,16 @@ public class OmidTransactionTable implements Table {
@Override
public void batch(List<? extends Row> actions, Object[] results)
throws IOException, InterruptedException {
+ tTable.batch(tx, actions, addShadowCells);
+ Arrays.fill(results, EMPTY_RESULT_EXISTS_TRUE);
}
@Override
public Object[] batch(List<? extends Row> actions) throws IOException,
InterruptedException {
- return null;
+ Object[] results;
+ batch(actions, results = new Object[actions.size()]);
+ return results;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index d1d531a..0f10b37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -26,7 +26,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol;
public class TransactionFactory {
public enum Provider {
TEPHRA((byte)1, TephraTransactionProvider.getInstance(), true),
- OMID((byte)2, OmidTransactionProvider.getInstance(), false);
+ OMID((byte)2, OmidTransactionProvider.getInstance(), true);
private final byte code;
private final PhoenixTransactionProvider provider;
@@ -50,7 +50,7 @@ public class TransactionFactory {
}
public static Provider getDefault() {
- return TEPHRA;
+ return OMID;
}
public PhoenixTransactionProvider getTransactionProvider() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 2049390..59e7fd3 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -22,7 +22,6 @@ import static org.apache.phoenix.query.QueryServicesOptions.withDefaults;
import org.apache.curator.shaded.com.google.common.io.Files;
import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
-import org.apache.phoenix.transaction.OmidTransactionProvider;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index f3faa0c..d2649e1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1106,7 +1106,7 @@ public class TestUtil {
}
return filteredData;
}
-
+
/**
* Find a random free port in localhost for binding.
* @return A port number or -1 for failure.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/phoenix-server/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-server/pom.xml b/phoenix-server/pom.xml
index 186d7a0..b4a90e9 100644
--- a/phoenix-server/pom.xml
+++ b/phoenix-server/pom.xml
@@ -122,6 +122,7 @@
<include>org.iq80.snappy:snappy</include>
<include>org.antlr:antlr*</include>
<include>org.apache.tephra:tephra*</include>
+ <include>org.apache.omid:omid*</include>
<include>com.google.code.gson:gson</include>
<include>org.jruby.joni:joni</include>
<include>org.jruby.jcodings:jcodings</include>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/85011433/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fc64bd2..e817200 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,7 @@
<avatica.version>1.12.0</avatica.version>
<jettyVersion>8.1.7.v20120910</jettyVersion>
<tephra.version>0.15.0-incubating</tephra.version>
+ <omid.version>1.0.0</omid.version>
<spark.version>2.3.2</spark.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
@@ -804,6 +805,52 @@
<artifactId>tephra-hbase-compat-1.3</artifactId>
<version>${tephra.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-hbase-client-hbase1.x</artifactId>
+ <version>${omid.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-hbase-coprocessor-hbase1.x</artifactId>
+ <version>${omid.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-tso-server-hbase1.x</artifactId>
+ <version>${omid.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.omid</groupId>
+ <artifactId>omid-tso-server-hbase1.x</artifactId>
+ <version>${omid.version}</version>
+ <type>test-jar</type>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- Make sure we have all the antlr dependencies -->
<dependency>