You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/10/29 03:12:18 UTC
phoenix git commit: PHOENIX-1821 Implement mechanism to convert a
table from non transactional to transactional
Repository: phoenix
Updated Branches:
refs/heads/txn 4c7f0cbb4 -> 4f454a755
PHOENIX-1821 Implement mechanism to convert a table from non transactional to transactional
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4f454a75
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4f454a75
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4f454a75
Branch: refs/heads/txn
Commit: 4f454a755f0215f1d58c7e557f09af465c193608
Parents: 4c7f0cb
Author: James Taylor <ja...@apache.org>
Authored: Wed Oct 28 19:11:49 2015 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Wed Oct 28 19:11:49 2015 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/AlterTableIT.java | 42 ++-
.../org/apache/phoenix/tx/TransactionIT.java | 152 +++++++++-
.../phoenix/exception/SQLExceptionCode.java | 2 +-
.../apache/phoenix/jdbc/PhoenixConnection.java | 6 +-
.../query/ConnectionQueryServicesImpl.java | 289 +++++++++++++------
.../query/ConnectionlessQueryServicesImpl.java | 4 +-
.../query/DelegateConnectionQueryServices.java | 4 +-
.../apache/phoenix/query/MetaDataMutated.java | 2 +-
.../apache/phoenix/schema/MetaDataClient.java | 51 +++-
.../apache/phoenix/schema/PMetaDataImpl.java | 4 +-
.../org/apache/phoenix/schema/PTableImpl.java | 4 +-
.../apache/phoenix/schema/TableProperty.java | 2 +-
12 files changed, 407 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 3004bd6..d871cda 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -38,8 +38,6 @@ import java.util.Collections;
import java.util.Map;
import java.util.Properties;
-import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
-
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeepDeletedCells;
@@ -63,6 +61,9 @@ import org.apache.phoenix.util.SchemaUtil;
import org.junit.BeforeClass;
import org.junit.Test;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+
/**
*
* A lot of tests in this class test HBase level properties. As a result,
@@ -1994,23 +1995,6 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
}
@Test
- public void testAlterTableToBeTransactional() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- String ddl = "CREATE TABLE test_table (k varchar primary key)";
- createTestTable(getUrl(), ddl);
-
- try {
- ddl = "ALTER TABLE test_table SET transactional=true";
- conn.createStatement().execute(ddl);
- fail();
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.CANNOT_ALTER_PROPERTY.getErrorCode(),e.getErrorCode());
- }
- }
-
-
- @Test
public void testCreateTableToBeTransactional() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -2022,19 +2006,31 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
assertTrue(table.isTransactional());
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+ try {
+ ddl = "ALTER TABLE TEST_TRANSACTIONAL_TABLE SET transactional=false";
+ conn.createStatement().execute(ddl);
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode());
+ }
+
HBaseAdmin admin = pconn.getQueryServices().getAdmin();
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING"));
desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES));
admin.createTable(desc);
+ ddl = "CREATE TABLE TXN_TEST_EXISTING (k varchar primary key) transactional=true";
+ conn.createStatement().execute(ddl);
+ assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING")).getValue(TxConstants.READ_NON_TX_DATA));
+
+ // Should be ok, as HBase metadata should match existing metadata.
+ ddl = "CREATE TABLE IF NOT EXISTS TEST_TRANSACTIONAL_TABLE (k varchar primary key)";
try {
- ddl = "CREATE TABLE TXN_TEST_EXISTING (k varchar primary key) transactional=true";
conn.createStatement().execute(ddl);
fail();
} catch (SQLException e) {
- assertEquals(SQLExceptionCode.MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL.getErrorCode(), e.getErrorCode());
+ assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode());
}
- // Should be ok, as HBase metadata should match existing metadata.
- ddl = "CREATE TABLE IF NOT EXISTS TEST_TRANSACTIONAL_TABLE (k varchar primary key)";
+ ddl += " transactional=true";
conn.createStatement().execute(ddl);
table = pconn.getTable(new PTableKey(null, "TEST_TRANSACTIONAL_TABLE"));
htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE"));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 52b5b5f..373ea99 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -10,7 +10,6 @@
package org.apache.phoenix.tx;
import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -22,26 +21,25 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Map;
-import java.util.Properties;
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.end2end.Shadower;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
-import com.google.common.collect.Maps;
-
public class TransactionIT extends BaseHBaseManagedTimeIT {
private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
@@ -119,8 +117,6 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
conn1.commit();
// verify rows are deleted after commit
- // FIXME: this is failing, I think because Tephra isn't handling deletes like we need it to
- // TODO: confirm this works once we get the patch from Gary.
rs = conn1.createStatement().executeQuery(selectSQL);
assertFalse(rs.next());
}
@@ -245,5 +241,133 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true");
testRowConflicts();
}
-
+
+ @Test
+ public void testNonTxToTxTable() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE NON_TX_TABLE(k INTEGER PRIMARY KEY, v VARCHAR)");
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (1)");
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (2, 'a')");
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (3, 'b')");
+ conn.commit();
+
+ conn.createStatement().execute("CREATE INDEX IDX ON NON_TX_TABLE(v)");
+
+ // Reset empty column value to an empty value like it is pre-transactions
+ /** TODO: when TEPHRA-143 is fixed, comment this back in
+ HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("NON_TX_TABLE"));
+ List<Put>puts = Lists.newArrayList(new Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new Put(PInteger.INSTANCE.toBytes(3)));
+ for (Put put : puts) {
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY);
+ }
+ htable.put(puts);
+ **/
+
+ conn.createStatement().execute("ALTER TABLE NON_TX_TABLE SET TRANSACTIONAL=true");
+
+ HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("NON_TX_TABLE"));
+ assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+ htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("IDX"));
+ assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (4, 'c')");
+ ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM NON_TX_TABLE WHERE v IS NULL");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertFalse(rs.next());
+ conn.commit();
+
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (5, 'd')");
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM NON_TX_TABLE");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(2,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(5,rs.getInt(1));
+ assertFalse(rs.next());
+ conn.rollback();
+
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM NON_TX_TABLE");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(2,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertFalse(rs.next());
+
+ /* TODO: this should succeed too (with SELECT going through index), but doesn't. Try again after TEPHRA-143 is fixed.
+ * It might be the case that we're still using an empty value for indexes.
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (5, 'd')");
+ rs = conn.createStatement().executeQuery("SELECT k FROM NON_TX_TABLE");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(2,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(5,rs.getInt(1));
+ assertFalse(rs.next());
+ conn.rollback();
+
+ rs = conn.createStatement().executeQuery("SELECT k FROM NON_TX_TABLE");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(2,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertFalse(rs.next());
+ */
+ }
+
+ @Test
+ public void testNonTxToTxTableFailure() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG
+ conn.createStatement().execute("CREATE TABLE SYSTEM.NON_TX_TABLE(k INTEGER PRIMARY KEY, v VARCHAR)");
+ conn.createStatement().execute("UPSERT INTO SYSTEM.NON_TX_TABLE VALUES (1)");
+ conn.commit();
+ // Reset empty column value to an empty value like it is pre-transactions
+ HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM.NON_TX_TABLE"));
+ Put put = new Put(PInteger.INSTANCE.toBytes(1));
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY);
+ htable.put(put);
+
+ HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+ try {
+ // This will succeed initially in updating the HBase metadata, but then will fail when
+ // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore
+ // the coprocessors back to the non transactional ones.
+ conn.createStatement().execute("ALTER TABLE SYSTEM.NON_TX_TABLE SET TRANSACTIONAL=true");
+ fail();
+ } catch (SQLException e) {
+ assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled"));
+ } finally {
+ admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+ admin.close();
+ }
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM SYSTEM.NON_TX_TABLE WHERE v IS NULL");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertFalse(rs.next());
+
+ htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM.NON_TX_TABLE"));
+ assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index db50f83..9925f75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -249,7 +249,7 @@ public enum SQLExceptionCode {
DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1056, "43A13", "Default column family not allowed on VIEW or shared INDEX"),
ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may be declared as transactional"),
- MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL(1071, "44A02", "An existing HBase table may not be mapped to as a transactional table"),
+ TX_MAY_NOT_SWITCH_TO_NON_TX(1071, "44A02", "A transactional table may not be switched to non transactional"),
STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"),
CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a transaction on a connection with SCN set"),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index e1c8dea..36679f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -776,11 +776,11 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
}
@Override
- public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime)
+ public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime)
throws SQLException {
- metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime);
+ metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
//Cascade through to connectionQueryServices too
- getQueryServices().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime);
+ getQueryServices().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
return metaData;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 13a0559..2f9e251 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;
import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
import co.cask.tephra.distributed.PooledClientProvider;
import co.cask.tephra.distributed.TransactionServiceClient;
import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
@@ -163,7 +164,6 @@ import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Objects;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
@@ -562,12 +562,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
- public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls, final long resolvedTime) throws SQLException {
+ public PMetaData addColumn(final PName tenantId, final String tableName, final List<PColumn> columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls, final boolean isTransactional, final long resolvedTime) throws SQLException {
return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() {
@Override
public PMetaData mutate(PMetaData metaData) throws SQLException {
try {
- return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime);
+ return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
} catch (TableNotFoundException e) {
// The DROP TABLE may have been processed first, so just ignore.
return metaData;
@@ -706,7 +706,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
}
- boolean isTransactional = Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name()));
+ boolean isTransactional =
+ Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) ||
+ Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE
// TODO: better encapsulation for this
// Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
// Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
@@ -718,8 +720,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null);
}
+ // For alter table, remove non transactional index coprocessor
+ if (descriptor.hasCoprocessor(Indexer.class.getName())) {
+ descriptor.removeCoprocessor(Indexer.class.getName());
+ }
} else {
if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+ // If exception on alter table to transition back to non transactional
+ if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+ descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
+ }
Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
@@ -761,8 +771,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
- if (isTransactional && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
- descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null);
+ if (isTransactional) {
+ if (!descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+ descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null);
+ }
+ } else {
+ // If exception on alter table to transition back to non transactional
+ if (descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+ descriptor.removeCoprocessor(TransactionProcessor.class.getName());
+ }
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
@@ -922,21 +939,25 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!modifyExistingMetaData) {
return existingDesc; // Caller already knows that no metadata was changed
}
+ boolean willBeTx = Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name()));
+ // If mapping an existing table as transactional, set property so that existing
+ // data is correctly read.
+ if (willBeTx) {
+ newDesc.setValue(TxConstants.READ_NON_TX_DATA, Boolean.TRUE.toString());
+ } else {
+ // If we think we're creating a non transactional table when it's already
+ // transactional, don't allow.
+ if (existingDesc.hasCoprocessor(TransactionProcessor.class.getName())) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
+ .setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName))
+ .setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException();
+ }
+ newDesc.remove(TxConstants.READ_NON_TX_DATA);
+ }
if (existingDesc.equals(newDesc)) {
return null; // Indicate that no metadata was changed
}
- // Don't allow TRANSACTIONAL attribute to change, as we may have issued
- // a CREATE TABLE IF NOT EXISTS and be updating the metadata.
- String existingTxnal = existingDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL);
- String newTxnal = newDesc.getValue(PhoenixDatabaseMetaData.TRANSACTIONAL);
- if (!Objects.equal(existingTxnal, newTxnal)) {
- if (existingTxnal == null) {
- newDesc.remove(PhoenixDatabaseMetaData.TRANSACTIONAL);
- } else {
- newDesc.setValue(PhoenixDatabaseMetaData.TRANSACTIONAL, existingTxnal);
- }
- }
modifyTable(tableName, newDesc, true);
return newDesc;
}
@@ -1263,22 +1284,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
boolean localIndexTable = Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME));
- HTableDescriptor tableDescriptor = null;
if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) {
// For views this will ensure that metadata already exists
// For tables and indexes, this will create the metadata if it doesn't already exist
- tableDescriptor = ensureTableCreated(tableName, tableType, tableProps, families, splits, true);
- // This means the HTable already existed and is transactional which is an error case for now,
- // as the timestamps are likely not scaled and the table may have delete markers (which isn't
- // handled by Tephra currently). It's possible that we could allow this, but only allow queries
- // after a major compaction and some conversion process runs.
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- boolean isTransactional = MetaDataUtil.isTransactional(m, kvBuilder, ptr);
- if (tableDescriptor != null && isTransactional) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL)
- .setSchemaName(Bytes.toString(schemaBytes)).setTableName(Bytes.toString(tableBytes))
- .build().buildException();
- }
+ ensureTableCreated(tableName, tableType, tableProps, families, splits, true);
}
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (tableType == PTableType.INDEX) { // Index on view
@@ -1530,12 +1539,156 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> stmtProperties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException {
List<Pair<byte[], Map<String, Object>>> families = new ArrayList<>(stmtProperties.size());
Map<String, Object> tableProps = new HashMap<String, Object>();
+ Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
+ boolean nonTxToTx = false;
HTableDescriptor tableDescriptor = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, families, tableProps);
- SQLException sqlE = null;
if (tableDescriptor != null) {
+ tableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
+ tableDescriptors.add(tableDescriptor);
+ nonTxToTx = Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA));
+ /*
+ * If the table was transitioned from non transactional to transactional, we need
+ * to also transition the index tables.
+ */
+ if (nonTxToTx) {
+ updateDescriptorForTx(table, tableProps, tableDescriptor, Boolean.TRUE.toString(), tableDescriptors);
+ }
+ }
+
+ boolean success = false;
+ boolean metaDataUpdated = !tableDescriptors.isEmpty();
+ boolean pollingNeeded = !(!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty());
+ MetaDataMutationResult result = null;
+ try {
+ sendHBaseMetaData(tableDescriptors, pollingNeeded);
+
+ // Special case for call during drop table to ensure that the empty column family exists.
+ // In this, case we only include the table header row, as until we add schemaBytes and tableBytes
+ // as args to this function, we have no way of getting them in this case.
+ // TODO: change to if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes
+ // Also, could be used to update property values on ALTER TABLE t SET prop=xxx
+ if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) {
+ return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table);
+ }
+ byte[][] rowKeyMetaData = new byte[3][];
+ PTableType tableType = table.getType();
+
+ Mutation m = tableMetaData.get(0);
+ byte[] rowKey = m.getRow();
+ SchemaUtil.getVarChars(rowKey, rowKeyMetaData);
+ byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+ byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+ byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+ byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
+
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ result = metaDataCoprocessorExec(tableKey,
+ new Batch.Call<MetaDataService, MetaDataResponse>() {
+ @Override
+ public MetaDataResponse call(MetaDataService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<MetaDataResponse> rpcCallback =
+ new BlockingRpcCallback<MetaDataResponse>();
+ AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
+ for (Mutation m : tableMetaData) {
+ MutationProto mp = ProtobufUtil.toProto(m);
+ builder.addTableMetadataMutations(mp.toByteString());
+ }
+
+ instance.addColumn(controller, builder.build(), rpcCallback);
+ if(controller.getFailedOn() != null) {
+ throw controller.getFailedOn();
+ }
+ return rpcCallback.get();
+ }
+ });
+
+ if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND || result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS) { // Success
+ success = true;
+ // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE
+ if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr)
+ && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) {
+ flushTable(table.getPhysicalName().getBytes());
+ }
+
+ if (tableType == PTableType.TABLE) {
+ // If we're changing MULTI_TENANT to true or false, create or drop the view index table
+ if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){
+ long timestamp = MetaDataUtil.getClientTimeStamp(m);
+ if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) {
+ this.ensureViewIndexTableCreated(table, timestamp);
+ } else {
+ this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp);
+ }
+ }
+ }
+ }
+ } finally {
+ // If we weren't successful with our metadata update
+ // and we've already pushed the HBase metadata changes to the server
+ // and we've tried to go from non transactional to transactional
+ // then we must undo the metadata change otherwise the table will
+ // no longer function correctly.
+ // Note that if this fails, we're in a corrupt state.
+ if (!success && metaDataUpdated && nonTxToTx) {
+ tableDescriptors.clear();
+ tableDescriptors.add(tableDescriptor);
+ tableProps.remove(TxConstants.READ_NON_TX_DATA);
+ updateDescriptorForTx(table, tableProps, tableDescriptor, null, tableDescriptors);
+ sendHBaseMetaData(tableDescriptors, pollingNeeded);
+ }
+ }
+ return result;
+ }
+ private void updateDescriptorForTx(PTable table, Map<String, Object> tableProps, HTableDescriptor tableDescriptor,
+ String txValue, Set<HTableDescriptor> descriptorsToUpdate) throws SQLException {
+ HBaseAdmin admin = null;
+ byte[] physicalTableName = table.getPhysicalName().getBytes();
+ try {
+ admin = new HBaseAdmin(config);
+ setTransactional(tableDescriptor, table.getType(), txValue, tableProps);
+ Map<String, Object> indexTableProps;
+ if (txValue == null) {
+ indexTableProps = Collections.<String,Object>emptyMap();
+ } else {
+ indexTableProps = Maps.newHashMapWithExpectedSize(1);
+ indexTableProps.put(TxConstants.READ_NON_TX_DATA, Boolean.valueOf(txValue));
+ }
+ for (PTable index : table.getIndexes()) {
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes());
+ descriptorsToUpdate.add(indexDescriptor);
+ setTransactional(indexDescriptor, index.getType(), txValue, indexTableProps);
+ }
+ try {
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getViewIndexPhysicalName(physicalTableName));
+ descriptorsToUpdate.add(indexDescriptor);
+ setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps);
+ } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
+ // Ignore, as we may never have created a view index table
+ }
+ try {
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(MetaDataUtil.getLocalIndexPhysicalName(physicalTableName));
+ descriptorsToUpdate.add(indexDescriptor);
+ setTransactional(indexDescriptor, PTableType.INDEX, txValue, indexTableProps);
+ } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
+ // Ignore, as we may never have created a view index table
+ }
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ if (admin != null) admin.close();
+ } catch (IOException e) {
+ logger.warn("Could not close admin",e);
+ }
+ }
+ }
+
+ private void sendHBaseMetaData(Set<HTableDescriptor> tableDescriptors, boolean pollingNeeded) throws SQLException {
+ SQLException sqlE = null;
+ for (HTableDescriptor descriptor : tableDescriptors) {
try {
- boolean pollingNotNeeded = (!tableProps.isEmpty() && families.isEmpty() && colFamiliesForPColumnsToBeAdded.isEmpty());
- modifyTable(table.getPhysicalName().getBytes(), tableDescriptor, !pollingNotNeeded);
+ modifyTable(descriptor.getName(), descriptor, pollingNeeded);
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} catch (InterruptedException e) {
@@ -1550,68 +1703,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
-
- // Special case for call during drop table to ensure that the empty column family exists.
- // In this, case we only include the table header row, as until we add schemaBytes and tableBytes
- // as args to this function, we have no way of getting them in this case.
- // TODO: change to if (tableMetaData.isEmpty()) once we pass through schemaBytes and tableBytes
- // Also, could be used to update property values on ALTER TABLE t SET prop=xxx
- if ((tableMetaData.isEmpty()) || (tableMetaData.size() == 1 && tableMetaData.get(0).isEmpty())) {
- return new MetaDataMutationResult(MutationCode.NO_OP, System.currentTimeMillis(), table);
- }
- byte[][] rowKeyMetaData = new byte[3][];
- PTableType tableType = table.getType();
-
- Mutation m = tableMetaData.get(0);
- byte[] rowKey = m.getRow();
- SchemaUtil.getVarChars(rowKey, rowKeyMetaData);
- byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
- byte[] schemaBytes = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
- byte[] tableBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
- byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
-
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
- new Batch.Call<MetaDataService, MetaDataResponse>() {
- @Override
- public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<MetaDataResponse> rpcCallback =
- new BlockingRpcCallback<MetaDataResponse>();
- AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
- for (Mutation m : tableMetaData) {
- MutationProto mp = ProtobufUtil.toProto(m);
- builder.addTableMetadataMutations(mp.toByteString());
- }
-
- instance.addColumn(controller, builder.build(), rpcCallback);
- if(controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
- return rpcCallback.get();
- }
- });
-
- if (result.getMutationCode() == MutationCode.COLUMN_NOT_FOUND) { // Success
- // Flush the table if transitioning DISABLE_WAL from TRUE to FALSE
- if ( MetaDataUtil.getMutationValue(m,PhoenixDatabaseMetaData.DISABLE_WAL_BYTES, kvBuilder, ptr)
- && Boolean.FALSE.equals(PBoolean.INSTANCE.toObject(ptr))) {
- flushTable(table.getPhysicalName().getBytes());
- }
-
- if (tableType == PTableType.TABLE) {
- // If we're changing MULTI_TENANT to true or false, create or drop the view index table
- if (MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES, kvBuilder, ptr)){
- long timestamp = MetaDataUtil.getClientTimeStamp(m);
- if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(ptr.get(), ptr.getOffset(), ptr.getLength()))) {
- this.ensureViewIndexTableCreated(table, timestamp);
- } else {
- this.ensureViewIndexTableDropped(table.getPhysicalName().getBytes(), timestamp);
- }
- }
- }
+ }
+ private void setTransactional(HTableDescriptor tableDescriptor, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException {
+ if (txValue == null) {
+ tableDescriptor.remove(TxConstants.READ_NON_TX_DATA);
+ } else {
+ tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue);
}
- return result;
+ this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps);
}
private HTableDescriptor separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<Pair<byte[], Map<String, Object>>> families, Map<String, Object> tableProps) throws SQLException {
@@ -1648,6 +1747,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Even though TTL is really a HColumnProperty we treat it specially.
// We enforce that all column families have the same TTL.
commonFamilyProps.put(propName, prop.getSecond());
+ } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) {
+ tableProps.put(TxConstants.READ_NON_TX_DATA, propValue);
}
} else {
if (isHColumnProperty(propName)) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 3f31141..4dbb257 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -173,8 +173,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
@Override
public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp,
- long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException {
- return metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime);
+ long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException {
+ return metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 460a879..5b32f3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -85,8 +85,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
@Override
public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp,
- long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException {
- return getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime);
+ long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException {
+ return getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, resolvedTime);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
index c78d0b6..1ad3d8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java
@@ -37,6 +37,6 @@ public interface MetaDataMutated {
PMetaData addTable(PTable table, long resolvedTime) throws SQLException;
PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException;
PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException;
- PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException;
+ PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException;
PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index b2982e4..1da3295 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -2040,12 +2040,12 @@ public class MetaDataClient {
return mutationCode;
}
- private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta) throws SQLException {
- return incrementTableSeqNum(table, expectedType, columnCountDelta, null, null, null, null);
+ private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional) throws SQLException {
+ return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null, null, null, null);
}
private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta,
- Boolean isImmutableRows, Boolean disableWAL, Boolean isMultiTenant, Boolean storeNulls)
+ Boolean isTransactional, Boolean isImmutableRows, Boolean disableWAL, Boolean isMultiTenant, Boolean storeNulls)
throws SQLException {
String schemaName = table.getSchemaName().getString();
String tableName = table.getTableName().getString();
@@ -2077,6 +2077,9 @@ public class MetaDataClient {
if (storeNulls != null) {
mutateBooleanProperty(tenantId, schemaName, tableName, STORE_NULLS, storeNulls);
}
+ if (isTransactional != null) {
+ mutateBooleanProperty(tenantId, schemaName, tableName, TRANSACTIONAL, isTransactional);
+ }
return seqNum;
}
@@ -2110,6 +2113,7 @@ public class MetaDataClient {
Boolean multiTenantProp = null;
Boolean disableWALProp = null;
Boolean storeNullsProp = null;
+ Boolean isTransactionalProp = null;
ListMultimap<String,Pair<String,Object>> stmtProperties = statement.getProps();
Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size());
@@ -2134,6 +2138,8 @@ public class MetaDataClient {
disableWALProp = (Boolean)prop.getSecond();
} else if (propName.equals(STORE_NULLS)) {
storeNullsProp = (Boolean)prop.getSecond();
+ } else if (propName.equals(TRANSACTIONAL)) {
+ isTransactionalProp = (Boolean)prop.getSecond();
}
}
}
@@ -2141,6 +2147,7 @@ public class MetaDataClient {
}
boolean retried = false;
boolean changingPhoenixTableProperty = false;
+ boolean nonTxToTx = false;
while (true) {
ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
table = resolver.getTables().get(0).getTable();
@@ -2192,6 +2199,23 @@ public class MetaDataClient {
changingPhoenixTableProperty = true;
}
}
+ Boolean isTransactional = null;
+ if (isTransactionalProp != null) {
+ if (isTransactionalProp.booleanValue() != table.isTransactional()) {
+ isTransactional = isTransactionalProp;
+ // We can only go one way: from non transactional to transactional
+ // Going the other way would require rewriting the cell timestamps
+ // and doing a major compaction to get rid of any Tephra specific
+ // delete markers.
+ if (!isTransactional) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ }
+ timeStamp = TransactionUtil.getTableTimestamp(connection, isTransactional);
+ changingPhoenixTableProperty = true;
+ nonTxToTx = true;
+ }
+ }
int numPkColumnsAdded = 0;
PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN);
@@ -2266,10 +2290,12 @@ public class MetaDataClient {
if (Boolean.FALSE.equals(isImmutableRows) && !table.getIndexes().isEmpty()) {
int hbaseVersion = connection.getQueryServices().getLowestClusterHBaseVersion();
if (hbaseVersion < PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES).setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
}
if (connection.getQueryServices().hasInvalidIndexConfiguration() && !table.isTransactional()) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
}
}
if (Boolean.TRUE.equals(multiTenant)) {
@@ -2277,16 +2303,18 @@ public class MetaDataClient {
}
}
- if (numPkColumnsAdded>0 && !table.getIndexes().isEmpty()) {
+ if (!table.getIndexes().isEmpty() && (numPkColumnsAdded>0 || nonTxToTx)) {
for (PTable index : table.getIndexes()) {
- incrementTableSeqNum(index, index.getType(), 1);
+ // TODO: verify master has fix for multiple index columns added and unit test
+ incrementTableSeqNum(index, index.getType(), numPkColumnsAdded, nonTxToTx ? Boolean.TRUE : null);
}
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
connection.rollback();
}
long seqNum = table.getSequenceNumber();
if (changingPhoenixTableProperty || columnDefs.size() > 0) {
- seqNum = incrementTableSeqNum(table, statement.getTableType(), 1, isImmutableRows, disableWAL, multiTenant, storeNulls);
+ // TODO: verify master has fix for multiple data columns added and unit test
+ seqNum = incrementTableSeqNum(table, statement.getTableType(), columnDefs.size(), isTransactional, isImmutableRows, disableWAL, multiTenant, storeNulls);
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
connection.rollback();
}
@@ -2336,6 +2364,7 @@ public class MetaDataClient {
disableWAL == null ? table.isWALDisabled() : disableWAL,
multiTenant == null ? table.isMultiTenant() : multiTenant,
storeNulls == null ? table.getStoreNulls() : storeNulls,
+ isTransactional == null ? table.isTransactional() : isTransactional,
TransactionUtil.getResolvedTime(connection, result));
}
// Delete rows in view index if we haven't dropped it already
@@ -2516,7 +2545,8 @@ public class MetaDataClient {
}
}
if(!indexColumnsToDrop.isEmpty()) {
- incrementTableSeqNum(index, index.getType(), -1);
+ // TODO: verify master has fix for multiple index columns dropped and unit test
+ incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null);
dropColumnMutations(index, indexColumnsToDrop, tableMetaData);
}
@@ -2525,7 +2555,8 @@ public class MetaDataClient {
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
connection.rollback();
- long seqNum = incrementTableSeqNum(table, statement.getTableType(), -1);
+ // TODO: verify master has fix for multiple data columns dropped and unit test
+ long seqNum = incrementTableSeqNum(table, statement.getTableType(), -tableColumnsToDrop.size(), null);
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
connection.rollback();
// Force table header to be first in list
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 8cfbb18..ae92677 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -285,7 +285,7 @@ public class PMetaDataImpl implements PMetaData {
}
@Override
- public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException {
+ public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long resolvedTime) throws SQLException {
PTableRef oldTableRef = metaData.get(new PTableKey(tenantId, tableName));
if (oldTableRef == null) {
return this;
@@ -299,7 +299,7 @@ public class PMetaDataImpl implements PMetaData {
newColumns.addAll(oldColumns);
newColumns.addAll(columnsToAdd);
}
- PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
+ PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional);
return addTable(newTable, resolvedTime);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index b78a904..fcd25f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -220,12 +220,12 @@ public class PTableImpl implements PTable {
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.isTransactional(), table.getTableStats());
}
- public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
+ public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
- isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.isTransactional(), table.getTableStats());
+ isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), isTransactional, table.getTableStats());
}
public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4f454a75/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 3c96405..091b929 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -48,7 +48,7 @@ public enum TableProperty {
STORE_NULLS(PhoenixDatabaseMetaData.STORE_NULLS, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false),
- TRANSACTIONAL(PhoenixDatabaseMetaData.TRANSACTIONAL, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, false, CANNOT_ALTER_PROPERTY, false),
+ TRANSACTIONAL(PhoenixDatabaseMetaData.TRANSACTIONAL, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false),
;