You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/05/10 18:04:03 UTC
[04/46] phoenix git commit: initial version of Tephra implementation
initial version of Tephra implementation
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cea251cf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cea251cf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cea251cf
Branch: refs/heads/omid
Commit: cea251cfcb9699a90d10dfe82626c264b9016bc4
Parents: acfc9d5
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Tue Feb 14 15:57:23 2017 +0200
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Tue Feb 14 15:57:23 2017 +0200
----------------------------------------------------------------------
.../transaction/OmidTransactionContext.java | 8 +-
.../transaction/OmidTransactionTable.java | 8 +-
.../transaction/PhoenixTransactionContext.java | 10 +-
.../transaction/PhoenixTransactionalTable.java | 5 -
.../transaction/TephraTransactionContext.java | 256 +++++++++++++++++--
.../transaction/TephraTransactionTable.java | 79 +++---
6 files changed, 265 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index bc5b05b..937ac14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -26,13 +26,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
}
@Override
- public void abort(SQLException e) throws SQLException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void checkpoint() throws SQLException {
+ public void checkpoint(boolean hasUncommittedData) throws SQLException {
// TODO Auto-generated method stub
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
index f15fdd3..725fe16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
@@ -15,7 +15,7 @@ import org.apache.hadoop.hbase.client.Scan;
public class OmidTransactionTable implements PhoenixTransactionalTable {
- public OmidTransactionTable(PhoenixTransactionContext ctx) {
+ public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
// TODO Auto-generated constructor stub
}
@@ -99,12 +99,6 @@ public class OmidTransactionTable implements PhoenixTransactionalTable {
}
@Override
- public HTableInterface getHTable() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public void setAutoFlush(boolean autoFlush) {
// TODO Auto-generated method stub
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index f07640e..87b68f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -29,18 +29,10 @@ public interface PhoenixTransactionContext {
public void abort() throws SQLException;
/**
- * Rollback a transaction
- *
- * @param e
- * @throws SQLException
- */
- public void abort(SQLException e) throws SQLException;
-
- /**
* Create a checkpoint in a transaction as defined in [TEPHRA-96]
* @throws SQLException
*/
- public void checkpoint() throws SQLException;
+ public void checkpoint(boolean hasUncommittedData) throws SQLException;
/**
* Commit DDL to guarantee that no transaction started before create index
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
index ff2632c..3a43068 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
@@ -101,11 +101,6 @@ public interface PhoenixTransactionalTable {
public void delete(List<Delete> deletes) throws IOException;
/**
- * Return the underling htable
- */
- public HTableInterface getHTable();
-
- /**
* Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
*/
public void setAutoFlush(boolean autoFlush);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 17c70f0..81c9fd1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -1,83 +1,285 @@
package org.apache.phoenix.transaction;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.Transaction.VisibilityLevel;
+import org.apache.tephra.visibility.FenceWait;
+import org.apache.tephra.visibility.VisibilityFence;
+
+import com.google.common.collect.Lists;
public class TephraTransactionContext implements PhoenixTransactionContext {
- @Override
- public void begin() throws SQLException {
- // TODO Auto-generated method stub
+ private final List<TransactionAware> txAwares;
+ private final TransactionContext txContext;
+ private Transaction tx;
+ private TransactionSystemClient txServiceClient;
+ private TransactionFailureException e;
- }
+ public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean threadSafe) {
- @Override
- public void commit() throws SQLException {
- // TODO Auto-generated method stub
+ this.txServiceClient = connection.getQueryServices().getTransactionSystemClient(); // TODO Should be wrapped for Omid side usage
+ assert(ctx instanceof TephraTransactionContext);
+ TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx;
+
+ if (threadSafe) {
+ this.tx = tephraTransactionContext.getTransaction();
+ this.txAwares = Lists.newArrayList();
+ this.txContext = null;
+ } else {
+ this.txAwares = Collections.emptyList();
+ if (ctx == null) {
+ this.txContext = new TransactionContext(txServiceClient);
+ } else {
+ this.txContext = tephraTransactionContext.getContext();
+ }
+ }
+
+ this.e = null;
}
@Override
- public void abort() throws SQLException {
- // TODO Auto-generated method stub
+ public void begin() throws SQLException {
+ if (txContext == null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
+ }
+ try {
+ txContext.start();
+ } catch (TransactionFailureException e) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage())
+ .setRootCause(e)
+ .build().buildException();
+ }
}
@Override
- public void abort(SQLException e) throws SQLException {
- // TODO Auto-generated method stub
-
+ public void commit() throws SQLException {
+ try {
+ assert(txContext != null);
+ txContext.finish();
+ } catch (TransactionFailureException e) {
+ this.e = e;
+ if (e instanceof TransactionConflictException) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
+ .setMessage(e.getMessage())
+ .setRootCause(e)
+ .build().buildException();
+ }
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage())
+ .setRootCause(e)
+ .build().buildException();
+ }
}
@Override
- public void checkpoint() throws SQLException {
- // TODO Auto-generated method stub
+ public void abort() throws SQLException {
+ try {
+ if (e != null) {
+ txContext.abort(e);
+ e = null;
+ } else {
+ txContext.abort();
+ }
+ } catch (TransactionFailureException e) {
+ this.e = null;
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+ .setMessage(e.getMessage())
+ .setRootCause(e)
+ .build().buildException();
+ }
+ }
+ @Override
+ public void checkpoint(boolean hasUncommittedData) throws SQLException {
+ if (hasUncommittedData) {
+ try {
+ if (txContext == null) {
+ tx = txServiceClient.checkpoint(tx);
+ } else {
+ assert(txContext != null);
+ txContext.checkpoint();
+ tx = txContext.getCurrentTransaction();
+ }
+ } catch (TransactionFailureException e) {
+ throw new SQLException(e);
+ }
+ }
+
+ if (txContext == null) {
+ tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ }
+ else {
+ assert(txContext != null);
+ txContext.getCurrentTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ }
}
@Override
public void commitDDLFence(PTable dataTable) throws SQLException,
InterruptedException, TimeoutException {
- // TODO Auto-generated method stub
-
+ byte[] key = dataTable.getName().getBytes();
+ try {
+ FenceWait fenceWait = VisibilityFence.prepareWait(key, txServiceClient);
+ fenceWait.await(10000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
+ } catch (TimeoutException | TransactionFailureException e) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
+ .setSchemaName(dataTable.getSchemaName().getString())
+ .setTableName(dataTable.getTableName().getString())
+ .build().buildException();
+ }
}
@Override
public void markDMLFence(PTable table) {
- // TODO Auto-generated method stub
-
+ byte[] logicalKey = table.getName().getBytes();
+ TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
+ if (this.txContext == null) {
+ this.txAwares.add(logicalTxAware);
+ } else {
+ this.txContext.addTransactionAware(logicalTxAware);
+ }
+ byte[] physicalKey = table.getPhysicalName().getBytes();
+ if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
+ TransactionAware physicalTxAware = VisibilityFence.create(physicalKey);
+ if (this.txContext == null) {
+ this.txAwares.add(physicalTxAware);
+ } else {
+ this.txContext.addTransactionAware(physicalTxAware);
+ }
+ }
}
@Override
public void join(PhoenixTransactionContext ctx) {
- // TODO Auto-generated method stub
+ assert(ctx instanceof TephraTransactionContext);
+ TephraTransactionContext tephraContext = (TephraTransactionContext) ctx;
+
+ tephraContext.getAwares();
+ if (txContext != null) {
+ for (TransactionAware txAware : tephraContext.getAwares()) {
+ txContext.addTransactionAware(txAware);
+ }
+ } else {
+ txAwares.addAll(tephraContext.getAwares());
+ }
}
- @Override
+ @Override
public boolean isTransactionRunning() {
- // TODO Auto-generated method stub
+ if (this.txContext != null) {
+ return (this.txContext.getCurrentTransaction() != null) ? true : false;
+ }
+
+ if (this.tx != null) {
+ return true;
+ }
+
return false;
}
@Override
public void reset() {
- // TODO Auto-generated method stub
-
+ tx = null;
+ txAwares.clear();
}
@Override
public long getTransactionId() {
- // TODO Auto-generated method stub
- return 0;
+ if (this.txContext != null) {
+ return txContext.getCurrentTransaction().getTransactionId();
+ }
+
+ if (tx != null) {
+ return tx.getTransactionId();
+ }
+
+ return HConstants.LATEST_TIMESTAMP;
}
@Override
public long getReadPointer() {
- // TODO Auto-generated method stub
- return 0;
+ if (this.txContext != null) {
+ return txContext.getCurrentTransaction().getReadPointer();
+ }
+
+ if (tx != null) {
+ return tx.getReadPointer();
+ }
+
+ return (-1);
}
+ /**
+ * TephraTransactionContext specific functions
+ */
+
+ Transaction getTransaction() {
+ return this.tx;
+ }
+
+ TransactionContext getContext() {
+ return this.txContext;
+ }
+
+ List<TransactionAware> getAwares() {
+ return txAwares;
+ }
+
+ void addTransactionAware(TransactionAware txAware) {
+ if (this.txContext != null) {
+ txContext.addTransactionAware(txAware);
+ } else if (this.tx != null) {
+ txAwares.add(txAware);
+ }
+ }
+
+ // For testing
+ public long getWritePointer() {
+ if (this.txContext != null) {
+ return txContext.getCurrentTransaction().getWritePointer();
+ }
+
+ if (tx != null) {
+ return tx.getWritePointer();
+ }
+
+ return HConstants.LATEST_TIMESTAMP;
+ }
+
+ // For testing
+ public VisibilityLevel getVisibilityLevel() {
+ if (this.txContext != null) {
+ return txContext.getCurrentTransaction().getVisibilityLevel();
+ }
+
+ if (tx != null) {
+ return tx.getVisibilityLevel();
+ }
+
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
index 0d788c1..c5ba33f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
@@ -12,132 +12,119 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.tephra.hbase.TransactionAwareHTable;
public class TephraTransactionTable implements PhoenixTransactionalTable {
- public TephraTransactionTable(PhoenixTransactionContext ctx) {
- // TODO Auto-generated constructor stub
+ private TransactionAwareHTable transactionAwareHTable;
+
+ private TephraTransactionContext tephraTransactionContext;
+
+ public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
+
+ assert(ctx instanceof TephraTransactionContext);
+
+ tephraTransactionContext = (TephraTransactionContext) ctx;
+
+ transactionAwareHTable = new TransactionAwareHTable(hTable);
+
+ tephraTransactionContext.addTransactionAware(transactionAwareHTable);
}
@Override
public Result get(Get get) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ return transactionAwareHTable.get(get);
}
@Override
public void put(Put put) throws IOException {
- // TODO Auto-generated method stub
-
+ transactionAwareHTable.put(put);
}
@Override
public void delete(Delete delete) throws IOException {
- // TODO Auto-generated method stub
-
+ transactionAwareHTable.delete(delete);
}
@Override
public ResultScanner getScanner(Scan scan) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ return transactionAwareHTable.getScanner(scan);
}
@Override
public byte[] getTableName() {
- // TODO Auto-generated method stub
- return null;
+ return transactionAwareHTable.getTableName();
}
@Override
public Configuration getConfiguration() {
- // TODO Auto-generated method stub
- return null;
+ return transactionAwareHTable.getConfiguration();
}
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
- // TODO Auto-generated method stub
- return null;
+ return transactionAwareHTable.getTableDescriptor();
}
@Override
public boolean exists(Get get) throws IOException {
- // TODO Auto-generated method stub
- return false;
+ return transactionAwareHTable.exists(get);
}
@Override
public Result[] get(List<Get> gets) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ return transactionAwareHTable.get(gets);
}
@Override
public ResultScanner getScanner(byte[] family) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ return transactionAwareHTable.getScanner(family);
}
@Override
public ResultScanner getScanner(byte[] family, byte[] qualifier)
throws IOException {
- // TODO Auto-generated method stub
- return null;
+ return transactionAwareHTable.getScanner(family, qualifier);
}
@Override
public void put(List<Put> puts) throws IOException {
- // TODO Auto-generated method stub
-
+ transactionAwareHTable.put(puts);
}
@Override
public void delete(List<Delete> deletes) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public HTableInterface getHTable() {
- // TODO Auto-generated method stub
- return null;
+ transactionAwareHTable.delete(deletes);
}
@Override
public void setAutoFlush(boolean autoFlush) {
- // TODO Auto-generated method stub
-
+ transactionAwareHTable.setAutoFlush(autoFlush);
}
@Override
public boolean isAutoFlush() {
- // TODO Auto-generated method stub
- return false;
+ return transactionAwareHTable.isAutoFlush();
}
@Override
public long getWriteBufferSize() {
- // TODO Auto-generated method stub
- return 0;
+ return transactionAwareHTable.getWriteBufferSize();
}
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
- // TODO Auto-generated method stub
-
+ transactionAwareHTable.setWriteBufferSize(writeBufferSize);
}
@Override
public void flushCommits() throws IOException {
- // TODO Auto-generated method stub
-
+ transactionAwareHTable.flushCommits();
}
@Override
public void close() throws IOException {
- // TODO Auto-generated method stub
-
+ transactionAwareHTable.close();
}
}