You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/03/11 00:34:11 UTC
phoenix git commit: Basic configuration of a transactional table
Repository: phoenix
Updated Branches:
refs/heads/txn 00976e81b -> 995e352c6
Basic configuration of a transactional table
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/995e352c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/995e352c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/995e352c
Branch: refs/heads/txn
Commit: 995e352c6971fc51888a25bfd0a5b7b737eee958
Parents: 00976e8
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Mar 10 16:34:05 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Mar 10 16:34:05 2015 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/AlterTableIT.java | 55 ++++++++++++++++++++
.../apache/phoenix/compile/PostDDLCompiler.java | 31 +++++++++--
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 3 ++
.../query/ConnectionQueryServicesImpl.java | 44 ++++++++++++++--
.../org/apache/phoenix/query/HTableFactory.java | 9 +++-
.../apache/phoenix/schema/MetaDataClient.java | 4 +-
.../org/apache/phoenix/util/SchemaUtil.java | 11 ++++
7 files changed, 147 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 59698d6..9c0171f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -38,10 +38,15 @@ import java.util.Collections;
import java.util.Map;
import java.util.Properties;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -50,6 +55,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -1988,4 +1994,53 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
conn.close();
}
}
+
+ @Test
+ public void testAlterTableToBeTransactional() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE test_table (k varchar primary key)";
+ createTestTable(getUrl(), ddl);
+
+ try {
+ ddl = "ALTER TABLE test_table SET transactional=true";
+ conn.createStatement().execute(ddl);
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE.getErrorCode(),e.getErrorCode());
+ }
+ }
+
+
+ @Test
+ public void testCreateTableToBeTransactional() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE TEST_TRANSACTIONAL_TABLE (k varchar primary key) transactional=true";
+ conn.createStatement().execute(ddl);
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+ HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE"));
+ assertTrue(SchemaUtil.isTransactional(htable.getTableDescriptor()));
+ assertTrue(htable instanceof TransactionAwareHTable);
+ assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+
+ HBaseAdmin admin = pconn.getQueryServices().getAdmin();
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING"));
+ desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES));
+ admin.createTable(desc);
+ try {
+ ddl = "CREATE TABLE TXN_TEST_EXISTING (k varchar primary key) transactional=true";
+ conn.createStatement().execute(ddl);
+ fail();
+ } catch (TableAlreadyExistsException e) {
+ }
+ // stays transactional
+ ddl = "CREATE TABLE IF NOT EXISTS TEST_TRANSACTIONAL_TABLE (k varchar primary key)";
+ conn.createStatement().execute(ddl);
+ assertTrue(SchemaUtil.isTransactional(pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE")).getTableDescriptor()));
+ // stays non transactional
+ ddl = "CREATE TABLE IF NOT EXISTS TXN_TEST_EXISTING (k varchar primary key) transactional=true";
+ conn.createStatement().execute(ddl);
+ assertFalse(SchemaUtil.isTransactional(pconn.getQueryServices().getTable(Bytes.toBytes("TXN_TEST_EXISTING")).getTableDescriptor()));
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 0c586f0..6b61039 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -42,10 +42,10 @@ import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
-import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ScanUtil;
import com.google.common.collect.Lists;
@@ -65,7 +65,7 @@ import com.google.common.collect.Lists;
*/
public class PostDDLCompiler {
private final PhoenixConnection connection;
- private final StatementContext context; // bogus context
+ private final Scan scan;
public PostDDLCompiler(PhoenixConnection connection) {
this(connection, new Scan());
@@ -73,13 +73,36 @@ public class PostDDLCompiler {
public PostDDLCompiler(PhoenixConnection connection, Scan scan) {
this.connection = connection;
- this.context = new StatementContext(new PhoenixStatement(connection), scan);
+ this.scan = scan;
scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE);
}
public MutationPlan compile(final List<TableRef> tableRefs, final byte[] emptyCF, final byte[] projectCF, final List<PColumn> deleteList,
final long timestamp) throws SQLException {
-
+ PhoenixStatement statement = new PhoenixStatement(connection);
+ final StatementContext context = new StatementContext(
+ statement,
+ new ColumnResolver() {
+
+ @Override
+ public List<TableRef> getTables() {
+ return tableRefs;
+ }
+
+ @Override
+ public TableRef resolveTable(String schemaName, String tableName) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ColumnRef resolveColumn(String schemaName, String tableName, String colName)
+ throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ },
+ scan,
+ new SequenceManager(statement));
return new MutationPlan() {
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 154fef7..400c921 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -266,6 +266,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
private static final String TENANT_POS_SHIFT = "TENANT_POS_SHIFT";
private static final byte[] TENANT_POS_SHIFT_BYTES = Bytes.toBytes(TENANT_POS_SHIFT);
+ public static final String TRANSACTIONAL = "TRANSACTIONAL";
+ public static final byte[] TRANSACTIONAL_BYTES = Bytes.toBytes(TRANSACTIONAL);
+
private final PhoenixConnection connection;
private final ResultSet emptyResultSet;
public static final int MAX_LOCAL_SI_VERSION_DISALLOW = VersionUtil.encodeVersion("0", "98", "8");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b149b92..8a8e072 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -42,6 +42,8 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -152,6 +154,7 @@ import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
@@ -292,9 +295,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, getExecutor());
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
- byte[][] schemaAndTableName = new byte[2][];
- SchemaUtil.getVarChars(tableName, schemaAndTableName);
- throw new TableNotFoundException(Bytes.toString(schemaAndTableName[0]), Bytes.toString(schemaAndTableName[1]));
+ throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName));
} catch (IOException e) {
throw new SQLException(e);
}
@@ -703,6 +704,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null);
}
}
+
+ if (SchemaUtil.isTransactional(descriptor) && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+ descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null);
+ }
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
}
@@ -862,6 +867,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return existingDesc;
}
+ // Don't allow TRANSACTIONAL attribute to change, as we may have issued
+ // a CREATE TABLE IF NOT EXISTS and be updating the metadata.
+ String existingTxnal = existingDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL);
+ String newTxnal = newDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL);
+ if (!Objects.equal(existingTxnal, newTxnal)) {
+ if (existingTxnal == null) {
+ newDesc.remove(PhoenixDatabaseMetaData.TRANSACTIONAL);
+ } else {
+ newDesc.setValue(PhoenixDatabaseMetaData.TRANSACTIONAL, existingTxnal);
+ }
+ }
modifyTable(tableName, newDesc, true);
return newDesc;
}
@@ -1188,10 +1204,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
boolean localIndexTable = Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME));
+ HTableDescriptor tableDescriptor = null;
if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) {
// For views this will ensure that metadata already exists
// For tables and indexes, this will create the metadata if it doesn't already exist
- ensureTableCreated(tableName, tableType, tableProps, families, splits, true);
+ tableDescriptor = ensureTableCreated(tableName, tableType, tableProps, families, splits, true);
}
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (tableType == PTableType.INDEX) { // Index on view
@@ -1246,6 +1263,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return rpcCallback.get();
}
});
+ // This means the HTable already existed and is transactional which is an
+ // error case unless IF NOT EXISTS was supplied (which the caller will check).
+ Object isTransactional = tableProps.get(PhoenixDatabaseMetaData.TRANSACTIONAL);
+ if (tableDescriptor != null && Boolean.TRUE.equals(isTransactional) != SchemaUtil.isTransactional(tableDescriptor)) {
+ return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, result.getMutationTime(), result.getTable());
+ }
return result;
}
@@ -1447,6 +1470,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
SQLException sqlE = null;
if (tableDescriptor != null) {
try {
+ if (SchemaUtil.hasTransactional(tableDescriptor)) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE)
+ .setMessage(PhoenixDatabaseMetaData.TRANSACTIONAL)
+ .setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString()).build().buildException();
+ }
boolean pollingNotNeeded = (!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty());
modifyTable(table.getPhysicalName().getBytes(), tableDescriptor, !pollingNotNeeded);
} catch (IOException e) {
@@ -1988,6 +2018,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public MutationState updateData(MutationPlan plan) throws SQLException {
+ PTable table = plan.getContext().getCurrentTable().getTable();
+ HTableDescriptor desc = this.getTableDescriptor(table.getPhysicalName().getBytes());
+ if (SchemaUtil.isTransactional(desc)) {
+ return new MutationState(1, plan.getConnection());
+ }
+
return plan.execute();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java
index 7a10683..4e1c089 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/HTableFactory.java
@@ -20,8 +20,11 @@ package org.apache.phoenix.query;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.util.SchemaUtil;
/**
* Creates clients to access HBase tables.
@@ -47,7 +50,11 @@ public interface HTableFactory {
static class HTableFactoryImpl implements HTableFactory {
@Override
public HTableInterface getTable(byte[] tableName, HConnection connection, ExecutorService pool) throws IOException {
- return connection.getTable(tableName, pool);
+ HTableInterface htable = connection.getTable(tableName, pool);
+ if (SchemaUtil.isTransactional(htable.getTableDescriptor())) {
+ return new TransactionAwareHTable(htable);
+ }
+ return htable;
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index e133433..b2b1bc9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1686,7 +1686,9 @@ public class MetaDataClient {
MutationCode code = result.getMutationCode();
switch(code) {
case TABLE_ALREADY_EXISTS:
- addTableToCache(result);
+ if (result.getTable() != null) { // Can happen for transactional table that already exists as HBase table
+ addTableToCache(result);
+ }
if (!statement.ifNotExists()) {
throw new TableAlreadyExistsException(schemaName, tableName, result.getTable());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/995e352c/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index c9574e3..a94e8ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -33,6 +33,7 @@ import java.util.Properties;
import javax.annotation.Nullable;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
@@ -370,6 +371,16 @@ public class SchemaUtil {
.getName().getBytesPtr();
}
+ public static boolean isTransactional(HTableDescriptor descriptor) {
+ byte[] isTransactional = descriptor.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES);
+ return (isTransactional != null && Boolean.TRUE.toString().equalsIgnoreCase(Bytes.toString(isTransactional)));
+ }
+
+ public static boolean hasTransactional(HTableDescriptor descriptor) {
+ byte[] isTransactional = descriptor.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES);
+ return (isTransactional != null);
+ }
+
public static boolean isMetaTable(byte[] tableName) {
return Bytes.compareTo(tableName, SYSTEM_CATALOG_NAME_BYTES) == 0;
}