You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/03/11 00:34:11 UTC

phoenix git commit: Basic configuration of a transactional table

Repository: phoenix
Updated Branches:
  refs/heads/txn 00976e81b -> 995e352c6


Basic configuration of a transactional table


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/995e352c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/995e352c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/995e352c

Branch: refs/heads/txn
Commit: 995e352c6971fc51888a25bfd0a5b7b737eee958
Parents: 00976e8
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Mar 10 16:34:05 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Mar 10 16:34:05 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AlterTableIT.java    | 55 ++++++++++++++++++++
 .../apache/phoenix/compile/PostDDLCompiler.java | 31 +++++++++--
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |  3 ++
 .../query/ConnectionQueryServicesImpl.java      | 44 ++++++++++++++--
 .../org/apache/phoenix/query/HTableFactory.java |  9 +++-
 .../apache/phoenix/schema/MetaDataClient.java   |  4 +-
 .../org/apache/phoenix/util/SchemaUtil.java     | 11 ++++
 7 files changed, 147 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 59698d6..9c0171f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -38,10 +38,15 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -50,6 +55,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -1988,4 +1994,53 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
             conn.close();
         }
     }
+    
+    @Test
+    public void testAlterTableToBeTransactional() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE test_table (k varchar primary key)";
+        createTestTable(getUrl(), ddl);
+
+        try {
+            ddl = "ALTER TABLE test_table SET transactional=true";
+            conn.createStatement().execute(ddl);
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE.getErrorCode(),e.getErrorCode());
+        }
+    }
+
+    
+    @Test
+    public void testCreateTableToBeTransactional() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE TEST_TRANSACTIONAL_TABLE (k varchar primary key) transactional=true";
+        conn.createStatement().execute(ddl);
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE"));
+        assertTrue(SchemaUtil.isTransactional(htable.getTableDescriptor()));
+        assertTrue(htable instanceof TransactionAwareHTable);
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+        
+        HBaseAdmin admin = pconn.getQueryServices().getAdmin();
+        HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING"));
+        desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES));
+        admin.createTable(desc);
+        try {
+            ddl = "CREATE TABLE TXN_TEST_EXISTING (k varchar primary key) transactional=true";
+            conn.createStatement().execute(ddl);
+            fail();
+        } catch (TableAlreadyExistsException e) {
+        }
+        // stays transactional
+        ddl = "CREATE TABLE IF NOT EXISTS TEST_TRANSACTIONAL_TABLE (k varchar primary key)"; 
+        conn.createStatement().execute(ddl);
+        assertTrue(SchemaUtil.isTransactional(pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE")).getTableDescriptor()));
+        // stays non transactional
+        ddl = "CREATE TABLE IF NOT EXISTS TXN_TEST_EXISTING (k varchar primary key) transactional=true"; 
+        conn.createStatement().execute(ddl);
+        assertFalse(SchemaUtil.isTransactional(pconn.getQueryServices().getTable(Bytes.toBytes("TXN_TEST_EXISTING")).getTableDescriptor()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 0c586f0..6b61039 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -42,10 +42,10 @@ import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
-import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.Lists;
@@ -65,7 +65,7 @@ import com.google.common.collect.Lists;
  */
 public class PostDDLCompiler {
     private final PhoenixConnection connection;
-    private final StatementContext context; // bogus context
+    private final Scan scan;
 
     public PostDDLCompiler(PhoenixConnection connection) {
         this(connection, new Scan());
@@ -73,13 +73,36 @@ public class PostDDLCompiler {
 
     public PostDDLCompiler(PhoenixConnection connection, Scan scan) {
         this.connection = connection;
-        this.context = new StatementContext(new PhoenixStatement(connection), scan);
+        this.scan = scan;
         scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE);
     }
 
     public MutationPlan compile(final List<TableRef> tableRefs, final byte[] emptyCF, final byte[] projectCF, final List<PColumn> deleteList,
             final long timestamp) throws SQLException {
-        
+        PhoenixStatement statement = new PhoenixStatement(connection);
+        final StatementContext context = new StatementContext(
+                statement, 
+                new ColumnResolver() {
+
+                    @Override
+                    public List<TableRef> getTables() {
+                        return tableRefs;
+                    }
+
+                    @Override
+                    public TableRef resolveTable(String schemaName, String tableName) throws SQLException {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public ColumnRef resolveColumn(String schemaName, String tableName, String colName)
+                            throws SQLException {
+                        throw new UnsupportedOperationException();
+                    }
+                    
+                },
+                scan,
+                new SequenceManager(statement));
         return new MutationPlan() {
             
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 154fef7..400c921 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -266,6 +266,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     private static final String TENANT_POS_SHIFT = "TENANT_POS_SHIFT";
     private static final byte[] TENANT_POS_SHIFT_BYTES = Bytes.toBytes(TENANT_POS_SHIFT);
 
+    public static final String TRANSACTIONAL = "TRANSACTIONAL";
+    public static final byte[] TRANSACTIONAL_BYTES = Bytes.toBytes(TRANSACTIONAL);
+
     private final PhoenixConnection connection;
     private final ResultSet emptyResultSet;
     public static final int MAX_LOCAL_SI_VERSION_DISALLOW = VersionUtil.encodeVersion("0", "98", "8");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b149b92..8a8e072 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -42,6 +42,8 @@ import java.util.concurrent.TimeoutException;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -152,6 +154,7 @@ import org.apache.phoenix.util.UpgradeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
@@ -292,9 +295,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         try {
             return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, getExecutor());
         } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
-            byte[][] schemaAndTableName = new byte[2][];
-            SchemaUtil.getVarChars(tableName, schemaAndTableName);
-            throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1]));
+            throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName));
         } catch (IOException e) {
         	throw new SQLException(e);
         }
@@ -703,6 +704,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null);
                 }
             }
+            
+            if (SchemaUtil.isTransactional(descriptor) && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+                descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null);
+            }
         } catch (IOException e) {
             throw ServerUtil.parseServerException(e);
         }
@@ -862,6 +867,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     return existingDesc;
                 }
 
+                // Don't allow TRANSACTIONAL attribute to change, as we may have issued
+                // a CREATE TABLE IF NOT EXISTS and be updating the metadata.
+                String existingTxnal = existingDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL);
+                String newTxnal = newDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL);
+                if (!Objects.equal(existingTxnal, newTxnal)) {
+                    if (existingTxnal == null) {
+                        newDesc.remove(PhoenixDatabaseMetaData.TRANSACTIONAL);
+                    } else {
+                        newDesc.setValue(PhoenixDatabaseMetaData.TRANSACTIONAL, existingTxnal);
+                    }
+                }
                 modifyTable(tableName, newDesc, true);
                 return newDesc;
             }
@@ -1188,10 +1204,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
         boolean localIndexTable = Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME));
 
+        HTableDescriptor tableDescriptor = null;
         if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) {
             // For views this will ensure that metadata already exists
             // For tables and indexes, this will create the metadata if it doesn't already exist
-            ensureTableCreated(tableName, tableType, tableProps, families, splits, true);
+            tableDescriptor = ensureTableCreated(tableName, tableType, tableProps, families, splits, true);
         }
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         if (tableType == PTableType.INDEX) { // Index on view
@@ -1246,6 +1263,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         return rpcCallback.get();
                     }
         });
+        // This means the HTable already existed and is transactional which is an
+        // error case unless IF NOT EXISTS was supplied (which the caller will check).
+        Object isTransactional = tableProps.get(PhoenixDatabaseMetaData.TRANSACTIONAL);
+        if (tableDescriptor != null && Boolean.TRUE.equals(isTransactional) != SchemaUtil.isTransactional(tableDescriptor)) {
+            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, result.getMutationTime(), result.getTable());
+        }
         return result;
     }
 
@@ -1447,6 +1470,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         SQLException sqlE = null;
         if (tableDescriptor != null) {
             try {
+                if (SchemaUtil.hasTransactional(tableDescriptor)) {
+                   throw new SQLExceptionInfo.Builder(
+                                        SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE)
+                                        .setMessage(PhoenixDatabaseMetaData.TRANSACTIONAL)
+                                        .setSchemaName(table.getSchemaName().getString())
+                                        .setTableName(table.getTableName().getString()).build().buildException();                
+                }
                 boolean pollingNotNeeded = (!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty());
                 modifyTable(table.getPhysicalName().getBytes(), tableDescriptor, !pollingNotNeeded);
             } catch (IOException e) {
@@ -1988,6 +2018,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     
     @Override
     public MutationState updateData(MutationPlan plan) throws SQLException {
+        PTable table = plan.getContext().getCurrentTable().getTable();
+        HTableDescriptor desc = this.getTableDescriptor(table.getPhysicalName().getBytes());
+        if (SchemaUtil.isTransactional(desc)) {
+            return new MutationState(1, plan.getConnection());
+        }
+
         return plan.execute();
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java
index 7a10683..4e1c089 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java
@@ -20,8 +20,11 @@ package org.apache.phoenix.query;
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.util.SchemaUtil;
 
 /**
  * Creates clients to access HBase tables.
@@ -47,7 +50,11 @@ public interface HTableFactory {
     static class HTableFactoryImpl implements HTableFactory {
         @Override
         public HTableInterface getTable(byte[] tableName, HConnection connection, ExecutorService pool) throws IOException {
-            return connection.getTable(tableName, pool);
+            HTableInterface htable = connection.getTable(tableName, pool);
+            if (SchemaUtil.isTransactional(htable.getTableDescriptor())) {
+                return new TransactionAwareHTable(htable);
+            }
+            return htable;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index e133433..b2b1bc9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1686,7 +1686,9 @@ public class MetaDataClient {
             MutationCode code = result.getMutationCode();
             switch(code) {
             case TABLE_ALREADY_EXISTS:
-                addTableToCache(result);
+                if (result.getTable() != null) { // Can happen for transactional table that already exists as HBase table
+                    addTableToCache(result);
+                }
                 if (!statement.ifNotExists()) {
                     throw new TableAlreadyExistsException(schemaName, tableName, result.getTable());
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index c9574e3..a94e8ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -33,6 +33,7 @@ import java.util.Properties;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -370,6 +371,16 @@ public class SchemaUtil {
                 .getName().getBytesPtr();
     }
 
+    public static boolean isTransactional(HTableDescriptor descriptor) {
+        byte[] isTransactional = descriptor.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES);
+        return (isTransactional != null && Boolean.TRUE.toString().equalsIgnoreCase(Bytes.toString(isTransactional)));
+    }
+        
+    public static boolean hasTransactional(HTableDescriptor descriptor) {
+        byte[] isTransactional = descriptor.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES);
+        return (isTransactional != null);
+    }
+        
     public static boolean isMetaTable(byte[] tableName) {
         return Bytes.compareTo(tableName, SYSTEM_CATALOG_NAME_BYTES) == 0;
     }