You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/05/10 18:04:03 UTC

[04/46] phoenix git commit: initial version of Tephra implementation

initial version of Tephra implementation


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cea251cf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cea251cf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cea251cf

Branch: refs/heads/omid
Commit: cea251cfcb9699a90d10dfe82626c264b9016bc4
Parents: acfc9d5
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Tue Feb 14 15:57:23 2017 +0200
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Tue Feb 14 15:57:23 2017 +0200

----------------------------------------------------------------------
 .../transaction/OmidTransactionContext.java     |   8 +-
 .../transaction/OmidTransactionTable.java       |   8 +-
 .../transaction/PhoenixTransactionContext.java  |  10 +-
 .../transaction/PhoenixTransactionalTable.java  |   5 -
 .../transaction/TephraTransactionContext.java   | 256 +++++++++++++++++--
 .../transaction/TephraTransactionTable.java     |  79 +++---
 6 files changed, 265 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index bc5b05b..937ac14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -26,13 +26,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
     }
 
     @Override
-    public void abort(SQLException e) throws SQLException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void checkpoint() throws SQLException {
+    public void checkpoint(boolean hasUncommittedData) throws SQLException {
         // TODO Auto-generated method stub
 
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
index f15fdd3..725fe16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
@@ -15,7 +15,7 @@ import org.apache.hadoop.hbase.client.Scan;
 
 public class OmidTransactionTable implements PhoenixTransactionalTable {
 
-    public OmidTransactionTable(PhoenixTransactionContext ctx) {
+    public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
         // TODO Auto-generated constructor stub
     }
 
@@ -99,12 +99,6 @@ public class OmidTransactionTable implements PhoenixTransactionalTable {
     }
 
     @Override
-    public HTableInterface getHTable() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
     public void setAutoFlush(boolean autoFlush) {
         // TODO Auto-generated method stub
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index f07640e..87b68f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -29,18 +29,10 @@ public interface PhoenixTransactionContext {
     public void abort() throws SQLException;
 
     /**
-     * Rollback a transaction
-     *
-     * @param e
-     * @throws SQLException
-     */
-    public void abort(SQLException e) throws SQLException;
-
-    /**
      * Create a checkpoint in a transaction as defined in [TEPHRA-96]
      * @throws SQLException
      */
-    public void checkpoint() throws SQLException;
+    public void checkpoint(boolean hasUncommittedData) throws SQLException;
 
     /**
      * Commit DDL to guarantee that no transaction started before create index

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
index ff2632c..3a43068 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
@@ -101,11 +101,6 @@ public interface PhoenixTransactionalTable {
     public void delete(List<Delete> deletes) throws IOException;
 
     /**
-     * Return the underling htable
-     */
-    public HTableInterface getHTable();
-
-    /**
      * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
      */
     public void setAutoFlush(boolean autoFlush);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 17c70f0..81c9fd1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -1,83 +1,285 @@
 package org.apache.phoenix.transaction;
 
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.Transaction.VisibilityLevel;
+import org.apache.tephra.visibility.FenceWait;
+import org.apache.tephra.visibility.VisibilityFence;
+
+import com.google.common.collect.Lists;
 
 public class TephraTransactionContext implements PhoenixTransactionContext {
 
-    @Override
-    public void begin() throws SQLException {
-        // TODO Auto-generated method stub
+    private final List<TransactionAware> txAwares;
+    private final TransactionContext txContext;
+    private Transaction tx;
+    private TransactionSystemClient txServiceClient;
+    private TransactionFailureException e;
 
-    }
+    public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean threadSafe) {
 
-    @Override
-    public void commit() throws SQLException {
-        // TODO Auto-generated method stub
+        this.txServiceClient = connection.getQueryServices().getTransactionSystemClient(); // TODO Should be wrapped for Omid side usage
 
+        assert(ctx instanceof TephraTransactionContext);
+        TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx;
+
+        if (threadSafe) {
+            this.tx = tephraTransactionContext.getTransaction();
+            this.txAwares = Lists.newArrayList();
+            this.txContext = null;
+        } else {
+            this.txAwares = Collections.emptyList();
+            if (ctx == null) {
+                this.txContext = new TransactionContext(txServiceClient);
+            } else {
+                this.txContext = tephraTransactionContext.getContext();
+            }
+        }
+
+        this.e = null;
     }
 
     @Override
-    public void abort() throws SQLException {
-        // TODO Auto-generated method stub
+    public void begin() throws SQLException {
+        if (txContext == null) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
+        }
 
+        try {
+            txContext.start();
+        } catch (TransactionFailureException e) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+            .setMessage(e.getMessage())
+            .setRootCause(e)
+            .build().buildException();
+        }
     }
 
     @Override
-    public void abort(SQLException e) throws SQLException {
-        // TODO Auto-generated method stub
-
+    public void commit() throws SQLException {
+        try {
+            assert(txContext != null);
+            txContext.finish();
+        } catch (TransactionFailureException e) {
+            this.e = e;
+            if (e instanceof TransactionConflictException) { 
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
+                    .setMessage(e.getMessage())
+                    .setRootCause(e)
+                    .build().buildException();
+            }
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+                .setMessage(e.getMessage())
+                .setRootCause(e)
+                .build().buildException();
+        }
     }
 
     @Override
-    public void checkpoint() throws SQLException {
-        // TODO Auto-generated method stub
+    public void abort() throws SQLException {
+        try {
+            if (e != null) {
+                txContext.abort(e);
+                e = null;
+            } else {
+                txContext.abort();
+            }
+        } catch (TransactionFailureException e) {
+            this.e = null;
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
+                .setMessage(e.getMessage())
+                .setRootCause(e)
+                .build().buildException();
+        }
+    }
 
+    @Override
+    public void checkpoint(boolean hasUncommittedData) throws SQLException {
+        if (hasUncommittedData) {
+            try {
+                if (txContext == null) {
+                    tx = txServiceClient.checkpoint(tx);
+                }  else {
+                    assert(txContext != null);
+                    txContext.checkpoint();
+                    tx = txContext.getCurrentTransaction();
+                }
+            } catch (TransactionFailureException e) {
+                throw new SQLException(e);
+            }
+        }
+  
+        if (txContext == null) {
+            tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+        }
+        else {
+            assert(txContext != null);
+            txContext.getCurrentTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+        }
     }
 
     @Override
     public void commitDDLFence(PTable dataTable) throws SQLException,
             InterruptedException, TimeoutException {
-        // TODO Auto-generated method stub
-
+        byte[] key = dataTable.getName().getBytes();
+        try {
+            FenceWait fenceWait = VisibilityFence.prepareWait(key, txServiceClient);
+            fenceWait.await(10000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
+        } catch (TimeoutException | TransactionFailureException e) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
+            .setSchemaName(dataTable.getSchemaName().getString())
+            .setTableName(dataTable.getTableName().getString())
+            .build().buildException();
+        }
     }
 
     @Override
     public void markDMLFence(PTable table) {
-        // TODO Auto-generated method stub
-
+        byte[] logicalKey = table.getName().getBytes();
+        TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
+        if (this.txContext == null) {
+            this.txAwares.add(logicalTxAware);
+        } else {
+            this.txContext.addTransactionAware(logicalTxAware);
+        }
+        byte[] physicalKey = table.getPhysicalName().getBytes();
+        if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
+            TransactionAware physicalTxAware = VisibilityFence.create(physicalKey);
+            if (this.txContext == null) {
+                this.txAwares.add(physicalTxAware);
+            } else {
+                this.txContext.addTransactionAware(physicalTxAware);
+            }
+        }
     }
 
     @Override
     public void join(PhoenixTransactionContext ctx) {
-        // TODO Auto-generated method stub
+        assert(ctx instanceof TephraTransactionContext);
+        TephraTransactionContext tephraContext = (TephraTransactionContext) ctx;
+
+        tephraContext.getAwares();
 
+        if (txContext != null) {
+            for (TransactionAware txAware : tephraContext.getAwares()) {
+                txContext.addTransactionAware(txAware);
+            }
+        } else {
+            txAwares.addAll(tephraContext.getAwares());
+        }
     }
 
-       @Override
+    @Override
     public boolean isTransactionRunning() {
-        // TODO Auto-generated method stub
+        if (this.txContext != null) {
+            return (this.txContext.getCurrentTransaction() != null) ? true : false;
+        }
+
+        if (this.tx != null) {
+            return true;
+        }
+
         return false;
     }
 
     @Override
     public void reset() {
-        // TODO Auto-generated method stub
-
+        tx = null;
+        txAwares.clear();
     }
 
     @Override
     public long getTransactionId() {
-        // TODO Auto-generated method stub
-        return 0;
+        if (this.txContext != null) {
+            return txContext.getCurrentTransaction().getTransactionId();
+        } 
+
+        if (tx != null) {
+            return tx.getTransactionId();
+        }
+
+        return HConstants.LATEST_TIMESTAMP;
     }
 
     @Override
     public long getReadPointer() {
-        // TODO Auto-generated method stub
-        return 0;
+        if (this.txContext != null) {
+            return txContext.getCurrentTransaction().getReadPointer();
+        } 
+
+        if (tx != null) {
+            return tx.getReadPointer();
+        }
+
+        return (-1);
     }
 
+   /**
+    * TephraTransactionContext specific functions
+    */
+
+    Transaction getTransaction() {
+        return this.tx;
+    }
+
+    TransactionContext getContext() {
+        return this.txContext;
+    }
+
+    List<TransactionAware> getAwares() {
+        return txAwares;
+    }
+
+    void addTransactionAware(TransactionAware txAware) {
+        if (this.txContext != null) {
+            txContext.addTransactionAware(txAware);
+        } else if (this.tx != null) {
+            txAwares.add(txAware);
+        }
+    }
+
+    // For testing
+    public long getWritePointer() {
+        if (this.txContext != null) {
+            return txContext.getCurrentTransaction().getWritePointer();
+        } 
+
+        if (tx != null) {
+            return tx.getWritePointer();
+        }
+
+        return HConstants.LATEST_TIMESTAMP;
+    }
+
+    // For testing
+    public VisibilityLevel getVisibilityLevel() {
+        if (this.txContext != null) {
+            return txContext.getCurrentTransaction().getVisibilityLevel();
+        } 
+
+        if (tx != null) {
+            return tx.getVisibilityLevel();
+        }
+
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cea251cf/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
index 0d788c1..c5ba33f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
@@ -12,132 +12,119 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.tephra.hbase.TransactionAwareHTable;
 
 public class TephraTransactionTable implements PhoenixTransactionalTable {
 
-    public TephraTransactionTable(PhoenixTransactionContext ctx) {
-        // TODO Auto-generated constructor stub
+    private TransactionAwareHTable transactionAwareHTable;
+    
+    private TephraTransactionContext tephraTransactionContext;
+    
+    public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
+
+        assert(ctx instanceof TephraTransactionContext);
+
+        tephraTransactionContext = (TephraTransactionContext) ctx;
+
+        transactionAwareHTable = new TransactionAwareHTable(hTable);
+
+        tephraTransactionContext.addTransactionAware(transactionAwareHTable);
     }
 
     @Override
     public Result get(Get get) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+        return transactionAwareHTable.get(get);
     }
 
     @Override
     public void put(Put put) throws IOException {
-        // TODO Auto-generated method stub
-
+        transactionAwareHTable.put(put);
     }
 
     @Override
     public void delete(Delete delete) throws IOException {
-        // TODO Auto-generated method stub
-
+        transactionAwareHTable.delete(delete);
     }
 
     @Override
     public ResultScanner getScanner(Scan scan) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+        return transactionAwareHTable.getScanner(scan);
     }
 
     @Override
     public byte[] getTableName() {
-        // TODO Auto-generated method stub
-        return null;
+        return transactionAwareHTable.getTableName();
     }
 
     @Override
     public Configuration getConfiguration() {
-        // TODO Auto-generated method stub
-        return null;
+        return transactionAwareHTable.getConfiguration();
     }
 
     @Override
     public HTableDescriptor getTableDescriptor() throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+        return transactionAwareHTable.getTableDescriptor();
     }
 
     @Override
     public boolean exists(Get get) throws IOException {
-        // TODO Auto-generated method stub
-        return false;
+        return transactionAwareHTable.exists(get);
     }
 
     @Override
     public Result[] get(List<Get> gets) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+        return transactionAwareHTable.get(gets);
     }
 
     @Override
     public ResultScanner getScanner(byte[] family) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+        return transactionAwareHTable.getScanner(family);
     }
 
     @Override
     public ResultScanner getScanner(byte[] family, byte[] qualifier)
             throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+        return transactionAwareHTable.getScanner(family, qualifier);
     }
 
     @Override
     public void put(List<Put> puts) throws IOException {
-        // TODO Auto-generated method stub
-
+        transactionAwareHTable.put(puts);
     }
 
     @Override
     public void delete(List<Delete> deletes) throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public HTableInterface getHTable() {
-        // TODO Auto-generated method stub
-        return null;
+        transactionAwareHTable.delete(deletes);
     }
 
     @Override
     public void setAutoFlush(boolean autoFlush) {
-        // TODO Auto-generated method stub
-
+        transactionAwareHTable.setAutoFlush(autoFlush);
     }
 
     @Override
     public boolean isAutoFlush() {
-        // TODO Auto-generated method stub
-        return false;
+        return transactionAwareHTable.isAutoFlush();
     }
 
     @Override
     public long getWriteBufferSize() {
-        // TODO Auto-generated method stub
-        return 0;
+        return transactionAwareHTable.getWriteBufferSize();
     }
 
     @Override
     public void setWriteBufferSize(long writeBufferSize) throws IOException {
-        // TODO Auto-generated method stub
-
+        transactionAwareHTable.setWriteBufferSize(writeBufferSize);
     }
 
     @Override
     public void flushCommits() throws IOException {
-        // TODO Auto-generated method stub
-
+        transactionAwareHTable.flushCommits();
     }
 
     @Override
     public void close() throws IOException {
-        // TODO Auto-generated method stub
-
+        transactionAwareHTable.close();
     }
 
 }