You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by go...@apache.org on 2022/02/02 22:23:47 UTC
[phoenix] branch master updated: PHOENIX-6622 Transform monitor
This is an automated email from the ASF dual-hosted git repository.
gokcen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new e67582d PHOENIX-6622 Transform monitor
e67582d is described below
commit e67582dc2f4ab4e50ee9d26d5c6b3662d29c3e83
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Thu Nov 18 10:51:14 2021 -0800
PHOENIX-6622 Transform monitor
* Implement TransformMonitor to monitor transform tool result and do orchestration
Signed-off-by: Gokcen Iskender <go...@gmail.com>
---
.../org/apache/phoenix/end2end/AggregateIT.java | 5 +-
.../apache/phoenix/end2end/BaseAggregateIT.java | 6 +-
.../end2end/LogicalTableNameExtendedIT.java | 16 +
.../phoenix/end2end/ParallelStatsDisabledIT.java | 8 +-
.../apache/phoenix/end2end/QueryWithOffsetIT.java | 6 +-
.../phoenix/end2end/index/SingleCellIndexIT.java | 44 ++
.../end2end/{index => transform}/TransformIT.java | 181 +++++-
.../transform/TransformMonitorExtendedIT.java | 171 ++++++
.../end2end/transform/TransformMonitorIT.java | 675 +++++++++++++++++++++
.../end2end/{ => transform}/TransformToolIT.java | 240 +++++---
.../org/apache/phoenix/compile/FromCompiler.java | 3 +-
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 2 -
.../phoenix/coprocessor/TaskRegionObserver.java | 12 +-
.../coprocessor/tasks/TransformMonitorTask.java | 216 +++++++
.../apache/phoenix/exception/SQLExceptionCode.java | 9 +-
.../phoenix/expression/RowKeyColumnExpression.java | 4 +
.../org/apache/phoenix/index/IndexMaintainer.java | 92 ++-
.../org/apache/phoenix/jdbc/PhoenixConnection.java | 7 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 2 +-
.../index/IndexVerificationOutputRepository.java | 11 +-
.../index/IndexVerificationResultRepository.java | 11 +-
.../index/PhoenixIndexImportDirectReducer.java | 5 +-
.../transform/PhoenixTransformReducer.java | 8 +-
.../phoenix/mapreduce/transform/TransformTool.java | 194 ++++--
.../mapreduce/util/PhoenixConfigurationUtil.java | 24 +-
.../phoenix/query/ConnectionQueryServicesImpl.java | 4 +-
.../org/apache/phoenix/query/QueryConstants.java | 10 +-
.../apache/phoenix/schema/ColumnMetaDataOps.java | 11 +-
.../phoenix/schema/IndexNotFoundException.java | 2 +-
.../org/apache/phoenix/schema/MetaDataClient.java | 45 +-
.../org/apache/phoenix/schema/PMetaDataImpl.java | 5 +-
.../java/org/apache/phoenix/schema/PTable.java | 25 +-
.../java/org/apache/phoenix/schema/PTableImpl.java | 1 -
.../phoenix/schema/TableNotFoundException.java | 12 +-
.../org/apache/phoenix/schema/TableProperty.java | 14 +
.../java/org/apache/phoenix/schema/task/Task.java | 15 +-
.../schema/transform/SystemTransformRecord.java | 41 +-
.../apache/phoenix/schema/transform/Transform.java | 388 +++++++++---
.../schema/transform/TransformMaintainer.java | 68 +--
.../java/org/apache/phoenix/util/SchemaUtil.java | 10 +
.../java/org/apache/phoenix/util/UpgradeUtil.java | 49 +-
.../java/org/apache/phoenix/query/BaseTest.java | 2 +-
42 files changed, 2326 insertions(+), 328 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
index fbb16d0..5d1a64f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
@@ -172,7 +172,8 @@ public class AggregateIT extends BaseAggregateIT {
assertTrue(rs.next());
assertEquals(0,rs.getInt(1));
initAvgGroupTable(conn, tableName, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=20 ");
- testAvgGroupByOrderPreserving(conn, tableName, 13);
+ // When ParallelStats are disabled it is 4. When enabled it is 13
+ testAvgGroupByOrderPreserving(conn, tableName, 4);
rs = executeQuery(conn, queryBuilder);
assertTrue(rs.next());
assertEquals(13,rs.getInt(1));
@@ -185,7 +186,7 @@ public class AggregateIT extends BaseAggregateIT {
rs = executeQuery(conn, queryBuilder);
assertTrue(rs.next());
assertEquals(13,rs.getInt(1));
- testAvgGroupByOrderPreserving(conn, tableName, 13);
+ testAvgGroupByOrderPreserving(conn, tableName, 4);
conn.createStatement().execute("ALTER TABLE " + tableName + " SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=100");
testAvgGroupByOrderPreserving(conn, tableName, 6);
conn.createStatement().execute("ALTER TABLE " + tableName + " SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=null");
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
index 5b83e39..01e92c6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
@@ -17,6 +17,9 @@
*/
package org.apache.phoenix.end2end;
+import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.executeQuery;
+import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.executeQueryThrowsException;
+import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.validateQueryPlan;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -569,7 +572,8 @@ public abstract class BaseAggregateIT extends ParallelStatsDisabledIT {
explainPlanAttributes.getServerAggregate());
TestUtil.analyzeTable(conn, tableName);
List<KeyRange> splits = TestUtil.getAllSplits(conn, tableName);
- assertEquals(nGuidePosts, splits.size());
+ // nGuideposts when stats are enabled, 4 when disabled
+ assertEquals(4, splits.size());
}
@Test
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
index 73eddfb..496d2bd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryConstants;
@@ -275,12 +276,16 @@ public class LogicalTableNameExtendedIT extends LogicalTableNameBaseIT {
String schemaName = "S_" + generateUniqueName();
String tableName = "TBL_" + generateUniqueName();
String viewName = "VW1_" + generateUniqueName();
+ String viewName2 = "VW2_" + generateUniqueName();
String viewIndexName1 = "VWIDX1_" + generateUniqueName();
String viewIndexName2 = "VWIDX2_" + generateUniqueName();
+ String view2IndexName1 = "VW2IDX1_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+ String fullViewName2 = SchemaUtil.getTableName(schemaName, viewName2);
String fullViewIndex1Name = SchemaUtil.getTableName(schemaName, viewIndexName1);
String fullViewIndex2Name = SchemaUtil.getTableName(schemaName, viewIndexName2);
+ String fullView2Index1Name = SchemaUtil.getTableName(schemaName, view2IndexName1);
String fullTableHName = schemaName + ":" + tableName;
try (Connection conn = getConnection(propsNamespace)) {
conn.setAutoCommit(true);
@@ -307,6 +312,17 @@ public class LogicalTableNameExtendedIT extends LogicalTableNameBaseIT {
assertEquals("PK10", rs.getString(2));
assertEquals("VIEW_COL2_10", rs.getString(3));
assertEquals(false, rs.next());
+
+ conn.createStatement().execute("CREATE VIEW " + fullViewName2 + " (VIEW_COL1 VARCHAR, VIEW_COL2 VARCHAR) AS SELECT * FROM "
+ + fullTableName);
+ conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + view2IndexName1 + " ON " + fullViewName2 + " (VIEW_COL1) include (VIEW_COL2)");
+ populateView(conn, fullViewName2, 20, 1);
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + fullView2Index1Name + " WHERE \"0:VIEW_COL1\"='VIEW_COL1_10'");
+ assertEquals(true, rs.next());
+ assertEquals("VIEW_COL1_10", rs.getString(1));
+ assertEquals("PK10", rs.getString(2));
+ assertEquals("VIEW_COL2_10", rs.getString(3));
+ assertEquals(false, rs.next());
}
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
index cb7aad8..2ce522d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
@@ -22,6 +22,7 @@ import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserv
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.QueryBuilder;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -58,6 +59,7 @@ public abstract class ParallelStatsDisabledIT extends BaseTest {
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60*60)); // An hour
+ props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -66,13 +68,13 @@ public abstract class ParallelStatsDisabledIT extends BaseTest {
BaseTest.freeResourcesIfBeyondThreshold();
}
- protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+ public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
PreparedStatement statement = conn.prepareStatement(queryBuilder.build());
ResultSet rs = statement.executeQuery();
return rs;
}
- protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
+ public static ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
ResultSet rs = null;
try {
@@ -85,7 +87,7 @@ public abstract class ParallelStatsDisabledIT extends BaseTest {
return rs;
}
- protected void validateQueryPlan(Connection conn, QueryBuilder queryBuilder, String expectedPhoenixPlan, String expectedSparkPlan) throws SQLException {
+ public static void validateQueryPlan(Connection conn, QueryBuilder queryBuilder, String expectedPhoenixPlan, String expectedSparkPlan) throws SQLException {
if (StringUtils.isNotBlank(expectedPhoenixPlan)) {
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + queryBuilder.build());
assertEquals(expectedPhoenixPlan, QueryUtil.getExplainPlan(rs));
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
index 14045fa..c7a28f0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
@@ -174,7 +174,11 @@ public class QueryWithOffsetIT extends ParallelStatsDisabledIT {
assertEquals(offset, explainPlanAttributes.getClientOffset()
.intValue());
if (!isSalted) {
- assertEquals("PARALLEL 5-WAY",
+ // When Parallel stats is actually disabled, it is PARALLEL 4-WAY
+ // CLIENT PARALLEL 4-WAY FULL SCAN OVER T_N000001
+ // SERVER SORTED BY [C2.V1] CLIENT MERGE SORT CLIENT OFFSET 10
+ // When enabled, it is 5-WAY
+ assertEquals("PARALLEL 4-WAY",
explainPlanAttributes.getIteratorTypeAndScanSize());
} else {
assertEquals("PARALLEL 10-WAY",
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
index 9e9dd76..d21d00a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
@@ -417,6 +417,50 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testPartialUpdateSingleCellTable() throws Exception {
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ String tableName = "TBL_" + generateUniqueName();
+ String idxName = "IND_" + generateUniqueName();
+
+ createTableAndIndex(conn, tableName, idxName, this.tableDDLOptions, false,3);
+ assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
+ assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName);
+
+ // Partial update 1st row
+ String upsert = "UPSERT INTO " + tableName + " (PK1, INT_PK, V4) VALUES ('PK1',1,'UpdatedV4')";
+ conn.createStatement().execute(upsert);
+ conn.commit();
+ String selectFromData = "SELECT /*+ NO_INDEX */ PK1, INT_PK, V1, V2, V4 FROM " + tableName + " where INT_PK = 1 and V4 LIKE 'UpdatedV4'";
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromData);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ assertTrue(actualExplainPlan.contains(tableName));
+
+ rs = conn.createStatement().executeQuery(selectFromData);
+ assertTrue(rs.next());
+ assertEquals("PK1", rs.getString(1));
+ assertEquals(1, rs.getInt(2));
+ assertEquals("V11", rs.getString(3));
+ assertEquals(2, rs.getInt(4));
+ assertFalse(rs.next());
+
+ String selectFromIndex = "SELECT PK1, INT_PK, V1, V2, V4 FROM " + tableName + " where V2 >= 2 and V4 = 'UpdatedV4'";
+ rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromIndex);
+ actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ assertTrue(actualExplainPlan.contains(idxName));
+
+ rs = conn.createStatement().executeQuery(selectFromIndex);
+ assertTrue(rs.next());
+ assertEquals("PK1", rs.getString(1));
+ assertEquals(1, rs.getInt(2));
+ assertEquals("V11", rs.getString(3));
+ assertEquals(2, rs.getInt(4));
+ assertFalse(rs.next());
+ }
+ }
+
private Connection getTenantConnection(String tenantId) throws Exception {
Properties tenantProps = PropertiesUtil.deepCopy(testProps);
tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformIT.java
similarity index 69%
rename from phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
rename to phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformIT.java
index e7aa5aa..bfdc570 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformIT.java
@@ -15,13 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.end2end.index;
+package org.apache.phoenix.end2end.transform;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.transform.SystemTransformRecord;
@@ -37,6 +39,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TABLE_WITH_LOCAL_INDEX;
+import static org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -44,6 +49,7 @@ import static org.apache.phoenix.util.TestUtil.getRowCount;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -53,6 +59,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
public TransformIT() {
testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, "ONE_CELL_PER_COLUMN");
testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
+ testProps.put(PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED, Boolean.toString(false));
}
@Before
@@ -195,13 +202,53 @@ public class TransformIT extends ParallelStatsDisabledIT {
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
fail();
} catch (SQLException e) {
- assertEquals(SQLExceptionCode.CANNOT_TRANSFORM_VIEW_INDEX.getErrorCode(), e.getErrorCode());
+ assertEquals(CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX.getErrorCode(), e.getErrorCode());
}
}
}
@Test
- public void testTransformForLiveMutations_mutatingTable() throws Exception {
+ public void testTransform_tableWithLocalIndex() throws Exception {
+ String dataTableName = "TBL_" + generateUniqueName();
+ String indexName = "LCLIDX_" + generateUniqueName();
+ String createIndexStmt = "CREATE LOCAL INDEX %s ON " + dataTableName + " (NAME) INCLUDE (ZIP) ";
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ TransformToolIT.createTableAndUpsertRows(conn, dataTableName, numOfRows, "");
+ conn.createStatement().execute(String.format(createIndexStmt, indexName));
+
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+ try {
+ conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ fail("Cannot transform local index");
+ } catch (SQLException e) {
+ assertEquals(CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX.getErrorCode(), e.getErrorCode());
+ }
+
+ try {
+ conn.createStatement().execute("ALTER TABLE " + dataTableName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ fail("Cannot transform table with local index");
+ } catch (SQLException e) {
+ assertEquals(CANNOT_TRANSFORM_TABLE_WITH_LOCAL_INDEX.getErrorCode(), e.getErrorCode());
+ }
+
+ }
+ }
+
+ @Test
+ public void testTransformForLiveMutations_mutatingImmutableTable() throws Exception {
+ testTransformForLiveMutations_mutatingTable(" IMMUTABLE_ROWS=TRUE");
+ }
+
+ @Test
+ public void testTransformForLiveMutations_mutatingMutableTable() throws Exception {
+ testTransformForLiveMutations_mutatingTable("");
+ }
+
+ private void testTransformForLiveMutations_mutatingTable(String tableDDL) throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
conn.setAutoCommit(true);
String schema = "S_" + generateUniqueName();
@@ -211,7 +258,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
String fullIdxName = SchemaUtil.getTableName(schema, idxName);
String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
- "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) " + tableDDL;
conn.createStatement().execute(createTableSql);
String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
@@ -286,7 +333,16 @@ public class TransformIT extends ParallelStatsDisabledIT {
}
@Test
- public void testTransformForLiveMutations_mutatingIndex() throws Exception {
+ public void testTransformForLiveMutations_mutatingMutableIndex() throws Exception {
+ testTransformForLiveMutations_mutatingIndex("");
+ }
+
+ @Test
+ public void testTransformForLiveMutations_mutatingImmutableIndex() throws Exception {
+ testTransformForLiveMutations_mutatingIndex(" IMMUTABLE_ROWS=true");
+ }
+
+ private void testTransformForLiveMutations_mutatingIndex(String tableDDL) throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
conn.setAutoCommit(true);
String schema = "S_" + generateUniqueName();
@@ -296,7 +352,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
String fullIdxName = SchemaUtil.getTableName(schema, idxName);
String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
- "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) " + tableDDL;
conn.createStatement().execute(createTableSql);
// Note that index will not be built, since we create it with ASYNC
@@ -327,7 +383,16 @@ public class TransformIT extends ParallelStatsDisabledIT {
}
@Test
- public void testTransformForLiveMutations_mutatingBaseTableForView() throws Exception {
+ public void testTransformForLiveMutations_mutatingMutableBaseTableForView() throws Exception {
+ testTransformForLiveMutations_mutatingBaseTableForView("");
+ }
+
+ @Test
+ public void testTransformForLiveMutations_mutatingImmutableBaseTableForView() throws Exception {
+ testTransformForLiveMutations_mutatingBaseTableForView(" IMMUTABLE_ROWS=true");
+ }
+
+ private void testTransformForLiveMutations_mutatingBaseTableForView(String tableDDL) throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
conn.setAutoCommit(true);
String schema = "S_" + generateUniqueName();
@@ -338,7 +403,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
String viewIdxName = "VWIDX_" + generateUniqueName();
String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
- "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) " + tableDDL;
conn.createStatement().execute(createTableSql);
assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
@@ -374,6 +439,106 @@ public class TransformIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testTransformForView() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ String schema = "S_" + generateUniqueName();
+ String tableName = "TBL_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schema, tableName);
+ String viewName = "VW_" + generateUniqueName();
+ String fullViewName = SchemaUtil.getTableName(schema, viewName);
+
+ String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ conn.createStatement().execute(createTableSql);
+
+ String createParentViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 VARCHAR ) AS SELECT * FROM " + fullTableName;
+ conn.createStatement().execute(createParentViewSql);
+
+ try {
+ conn.createStatement().execute("ALTER TABLE " + viewName + " SET "
+ + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ fail("View transform should fail");
+ } catch (SQLException e) {
+ assertEquals(e.getErrorCode(), VIEW_WITH_PROPERTIES.getErrorCode());
+ }
+ }
+ }
+
+ @Test
+ public void testAlterNotNeedsToTransformDueToSameProps() throws Exception {
+ String schema = "S_" + generateUniqueName();
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schema, tableName);
+ String fullIdxName = SchemaUtil.getTableName(schema, tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+
+ String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+ "V1 VARCHAR, V2 INTEGER, V3 INTEGER, V4 VARCHAR, V5 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) " +
+ "IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2";
+ conn.createStatement().execute(createTableSql);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, fullTableName);
+
+ // Now do an unnecessary transform
+ conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+ + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNull(record);
+
+ String createIndexSql = "CREATE INDEX " + indexName + " ON " + fullTableName + " (PK1, INT_PK) include (V1) ASYNC";
+ conn.createStatement().execute(createIndexSql);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, fullIdxName);
+
+ conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + fullTableName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ record = Transform.getTransformRecord(schema, indexName, tableName, null, conn.unwrap(PhoenixConnection.class));
+ assertNull(record);
+ }
+ }
+
+ @Test
+ public void testDropAfterTransform() throws Exception {
+ String schema = "S_" + generateUniqueName();
+ String tableName = "TBL_" + generateUniqueName();
+ String viewName = "VW_" + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schema, tableName);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+
+ String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ conn.createStatement().execute(createTableSql);
+ assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+
+ String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
+ conn.createStatement().execute(String.format(upsertStmt, "a", 1, "val1", 1));
+
+ String createParentViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 VARCHAR ) AS SELECT * FROM " + fullTableName;
+ conn.createStatement().execute(createParentViewSql);
+
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (V2) include (V1) ");
+
+ conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + fullTableName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ assertSystemTransform(conn.unwrap(PhoenixConnection.class), 1, schema, indexName, null);
+ conn.createStatement().execute("DROP INDEX " + indexName + " ON " + fullTableName);
+
+ conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+ + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ conn.createStatement().execute("DROP VIEW " + viewName);
+ conn.createStatement().execute("DROP TABLE " + fullTableName);
+ }
+ }
+
+
private void assertSystemTransform(Connection conn, int rowCount, String schema, String logicalTableName, String tenantId) throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM "+
PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
new file mode 100644
index 0000000..9f1b6d4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.transform;
+
+import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.end2end.transform.TransformMonitorIT.waitForTransformToGetToState;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class TransformMonitorExtendedIT extends BaseTest {
+ private static RegionCoprocessorEnvironment taskRegionEnvironment;
+ protected String dataTableDdl = "";
+ private Properties propsNamespace = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> serverProps = com.google.common.collect.Maps.newHashMapWithExpectedSize(2);
+ serverProps.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Integer.toString(60*60)); // An hour
+ serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
+
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
+
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ public TransformMonitorExtendedIT() throws IOException, InterruptedException {
+ StringBuilder optionBuilder = new StringBuilder();
+ optionBuilder.append(" COLUMN_ENCODED_BYTES=0,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN");
+ this.dataTableDdl = optionBuilder.toString();
+ propsNamespace.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
+
+ TableName taskTable = TableName.valueOf(SYSTEM_CATALOG_SCHEMA + QueryConstants.NAMESPACE_SEPARATOR + SYSTEM_TASK_TABLE);
+ taskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility()
+ .getRSForFirstRegionInTable(taskTable)
+ .getRegions(taskTable)
+ .get(0).getCoprocessorHost()
+ .findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl(), propsNamespace)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
+ conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+ }
+ }
+
+ @Test
+ public void testTransformIndexWithNamespaceEnabled() throws Exception {
+ String schemaName = "S_" + generateUniqueName();
+ String dataTableName = "TBL_" + generateUniqueName();
+ String fullDataTableName = SchemaUtil.getTableName(schemaName , dataTableName);
+ String indexName = "IDX_" + generateUniqueName();
+ String fullIndexName = SchemaUtil.getTableName(schemaName , indexName);
+ String createIndexStmt = "CREATE INDEX %s ON " + fullDataTableName + " (NAME) INCLUDE (ZIP) ";
+ try (Connection conn = DriverManager.getConnection(getUrl(), propsNamespace)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+ TransformToolIT.createTableAndUpsertRows(conn, fullDataTableName, numOfRows, dataTableDdl);
+ conn.createStatement().execute(String.format(createIndexStmt, indexName));
+ assertEquals(numOfRows, TransformMonitorIT.countRows(conn, fullIndexName));
+
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, fullIndexName);
+
+ conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + fullDataTableName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexName, fullDataTableName, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ TransformToolIT.upsertRows(conn, fullDataTableName, 2, 1);
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT \":ID\", \"0:ZIP\" FROM " + fullIndexName);
+ assertTrue(rs.next());
+ assertEquals("1", rs.getString(1));
+ assertEquals( 95051, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("2", rs.getString(1));
+ assertEquals( 95052, rs.getInt(2));
+ assertFalse(rs.next());
+ }
+ }
+ @Test
+ public void testTransformTableWithNamespaceEnabled() throws Exception {
+ String schemaName = "S_" + generateUniqueName();
+ String dataTableName = "TBL_" + generateUniqueName();
+ String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ try (Connection conn = DriverManager.getConnection(getUrl(), propsNamespace)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+ TransformToolIT.createTableAndUpsertRows(conn, fullDataTableName, numOfRows, dataTableDdl);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, fullDataTableName);
+
+ conn.createStatement().execute("ALTER TABLE " + fullDataTableName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+ TransformToolIT.upsertRows(conn, fullDataTableName, 2, 1);
+ assertEquals(numOfRows+1, TransformMonitorIT.countRows(conn, fullDataTableName));
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT ID, ZIP FROM " + fullDataTableName);
+ assertTrue(rs.next());
+ assertEquals("1", rs.getString(1));
+ assertEquals( 95051, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("2", rs.getString(1));
+ assertEquals( 95052, rs.getInt(2));
+ assertFalse(rs.next());
+ }
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
new file mode 100644
index 0000000..11baaf5
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.transform;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.end2end.IndexRebuildTaskIT.waitForTaskState;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRawRowCount;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TransformMonitorIT extends ParallelStatsDisabledIT {
+ private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+ private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ public TransformMonitorIT() throws IOException, InterruptedException {
+ testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, "ONE_CELL_PER_COLUMN");
+ testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
+ testProps.put(QueryServices.PHOENIX_ACLS_ENABLED, "true");
+
+ TaskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility()
+ .getRSForFirstRegionInTable(
+ PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+ .getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+ .get(0).getCoprocessorHost()
+ .findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
+ conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+ }
+ }
+
+ private void testTransformTable(boolean createIndex, boolean createView, boolean isImmutable) throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = "TBL_" + generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String newTableName = dataTableName + "_1";
+ String newTableFullName = SchemaUtil.getTableName(schemaName, newTableName);
+ String indexName = "IDX_" + generateUniqueName();
+ String indexName2 = "IDX_" + generateUniqueName();
+ String viewName = "VW_" + generateUniqueName();
+ String viewName2 = "VW2_" + generateUniqueName();
+ String viewIdxName = "VW_IDX_" + generateUniqueName();
+ String viewIdxName2 = "VW_IDX_" + generateUniqueName();
+ String view2IdxName1 = "VW2_IDX_" + generateUniqueName();
+ String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+ String createIndexStmt = "CREATE INDEX %s ON " + dataTableFullName + " (NAME) INCLUDE (ZIP) ";
+ String createViewStmt = "CREATE VIEW %s ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + dataTableFullName;
+ String createViewIdxSql = "CREATE INDEX %s ON " + viewName + " (VIEW_COL1) include (VIEW_COL2) ";
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 10;
+ TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+ if (createIndex) {
+ conn.createStatement().execute(String.format(createIndexStmt, indexName));
+ }
+
+ if (createView) {
+ conn.createStatement().execute(String.format(createViewStmt, viewName));
+ conn.createStatement().execute(String.format(createViewIdxSql, viewIdxName));
+ conn.createStatement().execute("UPSERT INTO " + viewName + "(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (1, 'uname11', 100, 'viewCol2')");
+ }
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, null);
+ assertEquals(1, taskRecordList.size());
+ assertEquals(PTable.TaskType.TRANSFORM_MONITOR, taskRecordList.get(0).getTaskType());
+ assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+ assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+ // Test that the PhysicalTableName is updated.
+ PTable oldTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+ assertEquals(newTableName, oldTable.getPhysicalName(true).getString());
+
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, dataTableFullName);
+
+ ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ long newRowCount = countRows(conn, newTableFullName);
+ assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))), newRowCount);
+
+ if (createIndex) {
+ assertEquals(newRowCount, countRows(conn, indexFullName));
+ int additionalRows = 2;
+ // Upsert new rows to new table. Note that after transform is complete, we are using the new table
+ TransformToolIT.upsertRows(conn, dataTableFullName, (int)newRowCount+1, additionalRows);
+ assertEquals(newRowCount+additionalRows, countRows(conn, indexFullName));
+ assertEquals(newRowCount, getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))));
+
+ // Create another index on the new table and count
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ TableName hTableName = TableName.valueOf(dataTableFullName);
+ admin.disableTable(hTableName);
+ admin.deleteTable(hTableName);
+ conn.createStatement().execute(String.format(createIndexStmt, indexName2));
+ assertEquals(newRowCount+additionalRows, countRows(conn, dataTableFullName));
+ assertEquals(newRowCount+additionalRows, countRows(conn, SchemaUtil.getTableName(schemaName, indexName2)));
+ } else if (createView) {
+ assertEquals(numOfRows, countRows(conn, viewName));
+ assertEquals(numOfRows, countRowsForViewIndex(conn, dataTableFullName));
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, viewName);
+ conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+
+ int additionalRows = 2;
+ // Upsert new rows to new table. Note that after transform is complete, we are using the new table
+ TransformToolIT.upsertRows(conn, viewName, (int)newRowCount+1, additionalRows);
+ assertEquals(newRowCount+additionalRows, getRowCount(conn, viewName));
+ assertEquals(newRowCount+additionalRows, countRowsForViewIndex(conn, dataTableFullName));
+
+ // Drop view index and create another on the new table and count
+ conn.createStatement().execute("DROP INDEX " + viewIdxName + " ON " + viewName);
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ TableName hTableName = TableName.valueOf(dataTableFullName);
+ admin.disableTable(hTableName);
+ admin.deleteTable(hTableName);
+ conn.createStatement().execute(String.format(createViewIdxSql, viewIdxName2));
+ assertEquals(newRowCount+additionalRows, countRowsForViewIndex(conn, dataTableFullName));
+
+ // Create another view and have a new index on top
+ conn.createStatement().execute(String.format(createViewStmt, viewName2));
+ conn.createStatement().execute(String.format(createViewIdxSql, view2IdxName1));
+ assertEquals((newRowCount+additionalRows)*2, countRowsForViewIndex(conn, dataTableFullName));
+
+ conn.createStatement().execute("UPSERT INTO " + viewName2 + "(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (100, 'uname100', 1000, 'viewCol100')");
+ ResultSet rs = conn.createStatement().executeQuery("SELECT VIEW_COL2, NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000");
+ assertTrue(rs.next());
+ assertEquals("viewCol100", rs.getString(1));
+ assertEquals("uname100", rs.getString(2));
+ assertFalse(rs.next());
+ }
+
+ }
+ }
+
+ public static int countRows(Connection conn, String tableFullName) throws SQLException {
+ ResultSet count = conn.createStatement().executeQuery("select /*+ NO_INDEX*/ count(*) from " + tableFullName);
+ count.next();
+ int numRows = count.getInt(1);
+ return numRows;
+ }
+
+ protected int countRowsForViewIndex(Connection conn, String baseTable) throws IOException, SQLException {
+ String viewIndexTableName = MetaDataUtil.getViewIndexPhysicalName(baseTable);
+ ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
+
+ Table indexHTable = queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+ // If there are multiple indexes on this view, this will return rows for others as well. For 1 view index, it is fine.
+ return getUtility().countRows(indexHTable);
+ }
+
+ @Test
+ public void testTransformMonitor_mutableTableWithoutIndex() throws Exception {
+ testTransformTable(false, false, false);
+ }
+
+ @Test
+ public void testTransformMonitor_immutableTableWithoutIndex() throws Exception {
+ testTransformTable(false, false, true);
+ }
+
+ @Test
+ public void testTransformMonitor_immutableTableWithIndex() throws Exception {
+ testTransformTable(true, false, true);
+ }
+
+ @Test
+ public void testTransformMonitor_pausedTransform() throws Exception {
+ testTransformMonitor_checkStates(PTable.TransformStatus.PAUSED, PTable.TaskStatus.COMPLETED);
+ }
+
+ @Test
+ public void testTransformMonitor_completedTransform() throws Exception {
+ testTransformMonitor_checkStates(PTable.TransformStatus.COMPLETED, PTable.TaskStatus.COMPLETED);
+ }
+
+ @Test
+ public void testTransformMonitor_failedTransform() throws Exception {
+ testTransformMonitor_checkStates(PTable.TransformStatus.FAILED, PTable.TaskStatus.FAILED);
+ }
+
+ private void testTransformMonitor_checkStates(PTable.TransformStatus transformStatus, PTable.TaskStatus taskStatus) throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ SystemTransformRecord.SystemTransformBuilder transformBuilder = new SystemTransformRecord.SystemTransformBuilder();
+ String logicalTableName = generateUniqueName();
+ transformBuilder.setLogicalTableName(logicalTableName);
+ transformBuilder.setTransformStatus(transformStatus.name());
+ transformBuilder.setNewPhysicalTableName(logicalTableName + "_1");
+ Transform.upsertTransform(transformBuilder.build(), conn.unwrap(PhoenixConnection.class));
+
+ TaskRegionObserver.SelfHealingTask task =
+ new TaskRegionObserver.SelfHealingTask(
+ TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+
+ Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(conn.unwrap(PhoenixConnection.class))
+ .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+ .setTenantId(null)
+ .setSchemaName(null)
+ .setTableName(logicalTableName)
+ .setTaskStatus(PTable.TaskStatus.CREATED.toString())
+ .setData(null)
+ .setPriority(null)
+ .setStartTs(startTs)
+ .setEndTs(null)
+ .build());
+ task.run();
+
+ waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, logicalTableName, taskStatus);
+ }
+ }
+
+ @Test
+ public void testTransformMonitor_pauseAndResumeTransform() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ TransformToolIT.pauseTableTransform(schemaName, dataTableName, conn, "");
+
+ List<String> args = TransformToolIT.getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, true, false, false);
+
+ // This run resumes transform and TransformMonitor task runs and completes it
+ TransformToolIT.runTransformTool(args.toArray(new String[0]), 0);
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, null);
+ assertEquals(1, taskRecordList.size());
+ assertEquals(PTable.TaskType.TRANSFORM_MONITOR, taskRecordList.get(0).getTaskType());
+ assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+ assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+ waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, dataTableName, PTable.TaskStatus.COMPLETED);
+ }
+ }
+
+ @Test
+ public void testTransformMonitor_mutableTableWithIndex() throws Exception {
+ testTransformTable(true, false, false);
+ }
+
+ @Test
+ public void testTransformMonitor_tableWithViews() throws Exception {
+ testTransformTable(false, true, false);
+ }
+
+ @Test
+ public void testTransformMonitor_index() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = "TBL_" + generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexName = "IDX_" + generateUniqueName();
+ String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+ String newTableFullName = indexFullName + "_1";
+ String createIndexStmt = "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (ZIP) INCLUDE (NAME) ";
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 10;
+ TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, numOfRows, "");
+ conn.createStatement().execute(createIndexStmt);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexFullName);
+
+ conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableFullName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, null);
+ assertEquals(1, taskRecordList.size());
+ assertEquals(PTable.TaskType.TRANSFORM_MONITOR, taskRecordList.get(0).getTaskType());
+ assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+ assertEquals(indexName, taskRecordList.get(0).getTableName());
+
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+ // Test that the PhysicalTableName is updated.
+ PTable oldTable = PhoenixRuntime.getTableNoCache(conn, indexFullName);
+ assertEquals(indexName+"_1", oldTable.getPhysicalName(true).getString());
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newTableFullName);
+ ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ long newRowCount = countRows(conn, newTableFullName);
+ assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(indexFullName))), newRowCount);
+ }
+ }
+
+ @Test
+ public void testTransformTableWithTenantViews() throws Exception {
+ String tenantId = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String viewTenantName = "TENANTVW_" + generateUniqueName();
+ String createTblStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL,ID INTEGER NOT NULL"
+ + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true";
+ String createViewStr = "CREATE VIEW %s (VIEW_COL1 VARCHAR) AS SELECT * FROM %s";
+
+ String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME, VIEW_COL1) VALUES('%s' , %d, '%s', '%s')";
+
+ Properties props = PropertiesUtil.deepCopy(testProps);
+ Connection connGlobal = null;
+ Connection connTenant = null;
+ try {
+ connGlobal = DriverManager.getConnection(getUrl(), props);
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ connTenant = DriverManager.getConnection(getUrl(), props);
+ connTenant.setAutoCommit(true);
+ String tableStmtGlobal = String.format(createTblStr, dataTableName);
+ connGlobal.createStatement().execute(tableStmtGlobal);
+ assertMetadata(connGlobal, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+
+ String viewStmtTenant = String.format(createViewStr, viewTenantName, dataTableName);
+ connTenant.createStatement().execute(viewStmtTenant);
+
+ // TODO: Fix this as part of implementing TransformTool so that the tenant view rows could be read from the tool
+// connTenant.createStatement()
+// .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 1, "x", "xx"));
+ try {
+ connTenant.createStatement().execute("ALTER TABLE " + dataTableName
+ + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ fail("Tenant connection cannot do alter");
+ } catch (SQLException e) {
+ assertEquals(CANNOT_CREATE_TENANT_SPECIFIC_TABLE.getErrorCode(), e.getErrorCode());
+ }
+ connGlobal.createStatement().execute("ALTER TABLE " + dataTableName
+ + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord tableRecord = Transform.getTransformRecord(null, dataTableName, null, null, connGlobal.unwrap(PhoenixConnection.class));
+ assertNotNull(tableRecord);
+
+ waitForTransformToGetToState(connGlobal.unwrap(PhoenixConnection.class), tableRecord, PTable.TransformStatus.COMPLETED);
+
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 2, "y", "yy"));
+
+ ResultSet rs = connTenant.createStatement().executeQuery("SELECT /*+ NO_INDEX */ VIEW_COL1 FROM " + viewTenantName);
+ assertTrue(rs.next());
+// assertEquals("xx", rs.getString(1));
+// assertTrue(rs.next());
+ assertEquals("yy", rs.getString(1));
+ assertFalse(rs.next());
+ } finally {
+ if (connGlobal != null) {
+ connGlobal.close();
+ }
+ if (connTenant != null) {
+ connTenant.close();
+ }
+ }
+ }
+
+ @Test
+ public void testTransformAlreadyTransformedIndex() throws Exception {
+ String dataTableName = "TBL_" + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ String createIndexStmt = "CREATE INDEX %s ON " + dataTableName + " (NAME) INCLUDE (ZIP) ";
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ TransformToolIT.createTableAndUpsertRows(conn, dataTableName, numOfRows, "");
+ conn.createStatement().execute(String.format(createIndexStmt, indexName));
+ assertEquals(numOfRows, countRows(conn, indexName));
+
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexName);
+
+ conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(null, indexName, dataTableName, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ TransformToolIT.upsertRows(conn, dataTableName, 2, 1);
+
+ // Removing this so that we are sure that we are not picking up the old transform record.
+ Transform.removeTransformRecord(record, conn.unwrap(PhoenixConnection.class));
+ conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableName +
+ " ACTIVE SET IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=0");
+ record = Transform.getTransformRecord(null, indexName, dataTableName, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+ TransformToolIT.upsertRows(conn, dataTableName, 3, 1);
+ assertEquals(numOfRows + 2, countRows(conn, indexName));
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, record.getNewPhysicalTableName());
+ ResultSet rs = conn.createStatement().executeQuery("SELECT \":ID\", \"0:ZIP\" FROM " + indexName);
+ assertTrue(rs.next());
+ assertEquals("1", rs.getString(1));
+ assertEquals( 95051, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("2", rs.getString(1));
+ assertEquals( 95052, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("3", rs.getString(1));
+ assertEquals( 95053, rs.getInt(2));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testTransformAlreadyTransformedTable() throws Exception {
+ String dataTableName = "TBL_" + generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ String stmString1 =
+ "CREATE TABLE IF NOT EXISTS " + dataTableName
+ + " (ID INTEGER NOT NULL, CITY_PK VARCHAR NOT NULL, NAME_PK VARCHAR NOT NULL,NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID, CITY_PK, NAME_PK)) ";
+ conn.createStatement().execute(stmString1);
+
+ String upsertQuery = "UPSERT INTO %s VALUES(%d, '%s', '%s', '%s', %d)";
+
+ // insert rows
+ conn.createStatement().execute(String.format(upsertQuery, dataTableName, 1, "city1", "name1", "uname1", 95051));
+
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(null, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+ conn.createStatement().execute(String.format(upsertQuery, dataTableName, 2, "city2", "name2", "uname2", 95052));
+
+ assertEquals(numOfRows+1, countRows(conn, dataTableName));
+
+ // Make sure that we are not accessing the original table. We are supposed to read from the new table above
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ TableName hTableName = TableName.valueOf(dataTableName);
+ admin.disableTable(hTableName);
+ admin.deleteTable(hTableName);
+
+ // Removing this so that we are sure that we are not picking up the old transform record.
+ Transform.removeTransformRecord(record, conn.unwrap(PhoenixConnection.class));
+ conn.createStatement().execute("ALTER TABLE " + dataTableName +
+ " SET IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=0");
+ record = Transform.getTransformRecord(null, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, record.getNewPhysicalTableName());
+ conn.createStatement().execute(String.format(upsertQuery, dataTableName, 3, "city3", "name3", "uname3", 95053));
+ assertEquals(numOfRows+2, countRows(conn, dataTableName));
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT ID, ZIP, NAME, NAME_PK, CITY_PK FROM " + dataTableName);
+ assertTrue(rs.next());
+ assertEquals("1", rs.getString(1));
+ assertEquals( 95051, rs.getInt(2));
+ assertEquals( "uname1", rs.getString(3));
+ assertEquals( "name1", rs.getString(4));
+ assertEquals( "city1", rs.getString(5));
+ assertTrue(rs.next());
+ assertEquals("2", rs.getString(1));
+ assertEquals( 95052, rs.getInt(2));
+ assertEquals( "uname2", rs.getString(3));
+ assertEquals( "name2", rs.getString(4));
+ assertEquals( "city2", rs.getString(5));
+ assertTrue(rs.next());
+ assertEquals("3", rs.getString(1));
+ assertEquals( 95053, rs.getInt(2));
+ assertEquals( "uname3", rs.getString(3));
+ assertEquals( "name3", rs.getString(4));
+ assertEquals( "city3", rs.getString(5));
+ assertFalse(rs.next());
+ }
+ }
+
+ public void testDifferentClientAccessTransformedTable(boolean isImmutable) throws Exception {
+ String dataTableName = "TBL_" + generateUniqueName();
+ try (Connection conn1 = DriverManager.getConnection(getUrl(), testProps)) {
+ conn1.setAutoCommit(true);
+ int numOfRows = 1;
+ TransformToolIT.createTableAndUpsertRows(conn1, dataTableName, numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+
+ conn1.createStatement().execute("ALTER TABLE " + dataTableName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(null, dataTableName, null, null, conn1.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ waitForTransformToGetToState(conn1.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+
+ // A connection does transform and another connection doesn't try to upsert into old table
+ String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries";
+ try (Connection conn2 = DriverManager.getConnection(url2, PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ conn2.setAutoCommit(true);
+ TransformToolIT.upsertRows(conn2, dataTableName, 2, 1);
+
+ ResultSet rs = conn2.createStatement().executeQuery("SELECT ID, NAME, ZIP FROM " + dataTableName);
+ assertTrue(rs.next());
+ assertEquals("1", rs.getString(1));
+ assertEquals("uname1", rs.getString(2));
+ assertEquals( 95051, rs.getInt(3));
+ assertTrue(rs.next());
+ assertEquals("2", rs.getString(1));
+ assertEquals("uname2", rs.getString(2));
+ assertEquals( 95052, rs.getInt(3));
+ assertFalse(rs.next());
+ }
+ }
+ }
+
+ @Test
+ public void testDifferentClientAccessTransformedTable_mutable() throws Exception {
+ // A connection does transform and another connection doesn't try to upsert into old table
+ testDifferentClientAccessTransformedTable(false);
+ }
+
+ @Test
+ public void testDifferentClientAccessTransformedTable_immutable() throws Exception {
+ // A connection does transform and another connection doesn't try to upsert into old table
+ testDifferentClientAccessTransformedTable(true);
+ }
+
+ @Test
+ public void testTransformTable_cutoverNotAuto() throws Exception {
+ // Transform index and see it is not auto cutover
+ String schemaName = generateUniqueName();
+ String dataTableName = "TBL_" + generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ TransformMonitorTask.disableTransformMonitorTask(true);
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, numOfRows, "");
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ // Wait for task to fail
+ waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, dataTableName, PTable.TaskStatus.FAILED);
+ } finally {
+ TransformMonitorTask.disableTransformMonitorTask(false);
+ }
+ }
+
+ @Test
+ public void testTransformMonitor_tableWithViews_OnOldAndNew() throws Exception {
+ // Create view before and after transform with different select statements and check
+ String schemaName = "S_" + generateUniqueName();
+ String dataTableName = "TBL_" + generateUniqueName();
+ String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String view1 = "VW1_" + generateUniqueName();
+ String view2 = "VW2_" + generateUniqueName();
+ String createTblStr = "CREATE TABLE %s (ID INTEGER NOT NULL, PK1 VARCHAR NOT NULL"
+ + ", NAME VARCHAR CONSTRAINT PK_1 PRIMARY KEY (ID, PK1)) ";
+ String createViewStr = "CREATE VIEW %s (VIEW_COL1 VARCHAR) AS SELECT * FROM %s WHERE NAME='%s'";
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)){
+ conn.setAutoCommit(true);
+ conn.createStatement().execute(String.format(createTblStr, fullDataTableName));
+
+ int numOfRows=2;
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullDataTableName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+ for (int i=1; i <= numOfRows; i++) {
+ stmt1.setInt(1, i);
+ stmt1.setString(2, "pk" + i);
+ stmt1.setString(3, "name"+ i);
+ stmt1.execute();
+ }
+ conn.createStatement().execute(String.format(createViewStr, view1, fullDataTableName, "name1"));
+
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, fullDataTableName);
+
+ conn.createStatement().execute("ALTER TABLE " + fullDataTableName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ conn.createStatement().execute(String.format(createViewStr, view2, fullDataTableName, "name2"));
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + view2);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("pk2", rs.getString(2));
+ assertFalse(rs.next());
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + view1);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("pk1", rs.getString(2));
+ assertFalse(rs.next());
+ }
+ }
+
+ public static void waitForTransformToGetToState(PhoenixConnection conn, SystemTransformRecord record, PTable.TransformStatus status) throws InterruptedException, SQLException {
+ int maxTries = 200, nTries = 0;
+ String lastStatus = "";
+ do {
+ if (status.name().equals(record.getTransformStatus())) {
+ return;
+ }
+ Thread.sleep(500);
+ record = Transform.getTransformRecord(record.getSchemaName(), record.getLogicalTableName(), record.getLogicalParentName(), record.getTenantId(), conn);
+ lastStatus = record.getTransformStatus();
+ } while (++nTries < maxTries);
+ try {
+ SingleCellIndexIT.dumpTable("SYSTEM.TASK");
+ } catch (Exception e) {
+
+ }
+ fail("Ran out of time waiting for transform state to become " + status + " but it was " + lastStatus);
+ }
+
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
similarity index 82%
rename from phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
rename to phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
index 9d87569..b8b39c7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
@@ -15,20 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.end2end;
+package org.apache.phoenix.end2end.transform;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.end2end.index.SingleCellIndexIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
@@ -38,8 +43,11 @@ import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +58,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -76,13 +85,23 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class TransformToolIT extends ParallelStatsDisabledIT{
+@RunWith(Parameterized.class)
+public class TransformToolIT extends ParallelStatsDisabledIT {
private static final Logger LOGGER = LoggerFactory.getLogger(TransformToolIT.class);
private final String tableDDLOptions;
- // TODO test with immutable
- private boolean mutable = true;
- public TransformToolIT() {
+ @Parameterized.Parameters(
+ name = "mutable={0}")
+ public static synchronized Collection<Object[]> data() {
+ List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+ boolean[] Booleans = new boolean[]{true, false};
+ for (boolean mutable : Booleans) {
+ list.add(new Object[]{mutable});
+ }
+ return list;
+ }
+
+ public TransformToolIT(boolean mutable) {
StringBuilder optionBuilder = new StringBuilder();
optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE ");
if (!mutable) {
@@ -93,6 +112,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
@BeforeClass
public static synchronized void setup() throws Exception {
+ TransformMonitorTask.disableTransformMonitorTask(true);
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
@@ -100,29 +120,40 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
serverProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+ serverProps.put(PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED, Boolean.FALSE.toString());
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+ clientProps.put(PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED, Boolean.FALSE.toString());
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
new ReadOnlyProps(clientProps.entrySet().iterator()));
}
+ @AfterClass
+ public static synchronized void cleanup() {
+ TransformMonitorTask.disableTransformMonitorTask(false);
+ }
+
private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows) throws SQLException {
createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
}
- private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows, String tableOptions) throws SQLException {
+ public static void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows, String tableOptions) throws SQLException {
String stmString1 =
"CREATE TABLE IF NOT EXISTS " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableOptions;
conn.createStatement().execute(stmString1);
+ upsertRows(conn, dataTableFullName, 1, numOfRows);
+ conn.commit();
+ }
+
+ public static void upsertRows(Connection conn, String dataTableFullName, int startIdx, int numOfRows) throws SQLException {
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
// insert rows
- for (int i = 1; i <= numOfRows; i++) {
+ for (int i = startIdx; i < startIdx+numOfRows; i++) {
IndexToolIT.upsertRow(stmt1, i);
}
- conn.commit();
}
@Test
public void testTransformTable() throws Exception {
@@ -134,7 +165,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
int numOfRows = 2;
- createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
" SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
@@ -146,13 +177,9 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
null, null, null, false, false, false, false,false);
runTransformTool(args.toArray(new String[0]), 0);
record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
- assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+ assertTransformStatusOrPartial(PTable.TransformStatus.PENDING_CUTOVER, record);
assertEquals(getRowCount(conn, dataTableFullName), getRowCount(conn,newTableFullName));
- // Test that the PhysicalTableName is updated.
- PTable oldTable = PhoenixRuntime.getTable(conn, dataTableFullName);
- assertEquals(dataTableName+"_1", oldTable.getPhysicalName(true).getString());
-
String sql = "SELECT ID, NAME, ZIP FROM %s ";
ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, dataTableFullName));
ResultSet rs2 = conn.createStatement().executeQuery(String.format(sql, newTableFullName));
@@ -176,7 +203,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
- createTableAndUpsertRows(conn, dataTableFullName, 2);
+ createTableAndUpsertRows(conn, dataTableFullName, 2, tableDDLOptions);
conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
" SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
@@ -193,10 +220,10 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
}
}
- private void pauseTableTransform(String schemaName, String dataTableName, Connection conn) throws Exception {
+ public static void pauseTableTransform(String schemaName, String dataTableName, Connection conn, String tableDDLOptions) throws Exception {
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- createTableAndUpsertRows(conn, dataTableFullName, 2);
+ createTableAndUpsertRows(conn, dataTableFullName, 2, tableDDLOptions);
conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
" SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
@@ -219,7 +246,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
- pauseTableTransform(schemaName, dataTableName, conn);
+ pauseTableTransform(schemaName, dataTableName, conn, tableDDLOptions);
}
}
@@ -230,13 +257,13 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
- pauseTableTransform(schemaName, dataTableName, conn);
+ pauseTableTransform(schemaName, dataTableName, conn, tableDDLOptions);
List<String> args = getArgList(schemaName, dataTableName, null,
null, null, null, false, false, true, false, false);
runTransformTool(args.toArray(new String[0]), 0);
SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
- assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+ assertTransformStatusOrPartial(PTable.TransformStatus.PENDING_CUTOVER, record);
}
}
@@ -328,11 +355,11 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
conn.setAutoCommit(true);
String dataDDL =
"CREATE TABLE " + dataTableFullName + "(\n"
- + "ID VARCHAR(5) NOT NULL PRIMARY KEY,\n"
+ + "ID CHAR(5) NOT NULL PRIMARY KEY,\n"
+ "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
+ "\"test\".CAR_NUM VARCHAR(18) NULL,\n"
+ "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n"
- + "\"test\".ORG_NAME VARCHAR(255) NULL\n" + ") IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES = 0";
+ + "\"test\".ORG_NAME VARCHAR(255) NULL\n" + ") IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE ";
conn.createStatement().execute(dataDDL);
// insert data
@@ -390,7 +417,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
int numOfRows = 2;
- createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName +
@@ -404,13 +431,9 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
runTransformTool(args.toArray(new String[0]), 0);
record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
- assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+ assertTransformStatusOrPartial(PTable.TransformStatus.PENDING_CUTOVER, record);
assertEquals(getRowCount(conn, indexTableFullName), getRowCount(conn, indexTableFullName + "_1"));
- // Test that the PhysicalTableName is updated.
- PTable oldTable = PhoenixRuntime.getTable(conn, indexTableFullName);
- assertEquals(indexTableName+"_1", oldTable.getPhysicalName(true).getString());
-
String sql = "SELECT \":ID\", \"0:NAME\", \"0:ZIP\" FROM %s ORDER BY \":ID\"";
ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, indexTableFullName));
ResultSet rs2 = conn.createStatement().executeQuery(String.format(sql, indexTableFullName + "_1"));
@@ -439,7 +462,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
int numOfRows = 0;
- createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
String createParentViewSql = "CREATE VIEW " + parentViewName + " ( PARENT_VIEW_COL1 VARCHAR ) AS SELECT * FROM " + dataTableFullName;
conn.createStatement().execute(createParentViewSql);
@@ -508,14 +531,12 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
}
@Test
- public void testTransformVerifiedForTransactionalTable() throws Exception {
- // TODO: Column encoding is not supported for OMID. Have omid test for other type of transforms.
- // For now, we don't support transforms other than storage and column encoding type change.
- //testVerifiedForTransactionalTable("OMID");
- testVerifiedForTransactionalTable("TEPHRA");
+ public void testTransformFailedForTransactionalTable() throws Exception {
+ testTransactionalTableCannotTransform("OMID");
+ testTransactionalTableCannotTransform("TEPHRA");
}
- private void testVerifiedForTransactionalTable(String provider) throws Exception{
+ private void testTransactionalTableCannotTransform(String provider) throws Exception{
String tableOptions = tableDDLOptions + " ,TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'";
String schemaName = generateUniqueName();
@@ -529,41 +550,17 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableOptions);
SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
- conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
- " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
- SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
- assertNotNull(record);
- assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
-
+ try {
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ } catch (SQLException ex) {
+ assertEquals(SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE.getErrorCode(), ex.getErrorCode());
+ }
+ // Even when we run TransformTool again, verified bit is not cleared but the empty column stays as is
List<String> args = getArgList(schemaName, dataTableName, null,
null, null, null, false, false, false, false, false);
- runTransformTool(args.toArray(new String[0]), 0);
- assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
-
-
- String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
- PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-
- // Run again to check that VERIFIED row still remains verified
- runTransformTool(args.toArray(new String[0]), 0);
- assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
- assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
-
- // We will have two rows with empty col = 'x' since there is no IndexRegionObserver for transactional table
- IndexToolIT.upsertRow(stmt1, ++numOfRows);
- IndexToolIT.upsertRow(stmt1, ++numOfRows);
-
- assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
- assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
- assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), EMPTY_COLUMN_VALUE_BYTES));
- // Even when we run TransformTool again, verified bit is not cleared but the empty column stays as is
- args = getArgList(schemaName, dataTableName, null,
- null, null, null, false, false, false, false, false);
- runTransformTool(args.toArray(new String[0]), 0);
- assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
- assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
- assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), EMPTY_COLUMN_VALUE_BYTES));
+ runTransformTool(args.toArray(new String[0]), -1);
}
}
@@ -577,7 +574,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
int numOfRows = 2;
- createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
@@ -647,7 +644,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
int numOfRows2= 2;
String dataTableName2 = generateUniqueName();
String dataTableFullName2 = SchemaUtil.getTableName(schemaName, dataTableName2);
- createTableAndUpsertRows(conn, dataTableFullName2, numOfRows2);
+ createTableAndUpsertRows(conn, dataTableFullName2, numOfRows2, tableDDLOptions);
conn.createStatement().execute("ALTER TABLE " + dataTableFullName2 +
" SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
record = Transform.getTransformRecord(schemaName, dataTableName2, null, null, conn.unwrap(PhoenixConnection.class));
@@ -687,7 +684,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
conn.setAutoCommit(true);
int numOfRows = 1;
int numOfRowsInNewTbl = 0;
- createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
@@ -746,6 +743,109 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
}
}
+ @Test
+ public void testTransformVerify_VerifyOnlyShouldNotChangeTransformState() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = "IDX_" + generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
+ conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ List<String> args = getArgList(schemaName, dataTableName, indexTableName,
+ null, null, null, false, false, false, false, false);
+ args.add("-v");
+ args.add(IndexTool.IndexVerifyType.ONLY.getValue());
+ TransformTool transformTool = runTransformTool(args.toArray(new String[0]), 0);
+ // No change
+ assertEquals(PTable.TransformStatus.CREATED.toString(), record.getTransformStatus());
+
+ // Now test data table
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false, false);
+ args.add("-v");
+ args.add(IndexTool.IndexVerifyType.ONLY.getValue());
+ runTransformTool(args.toArray(new String[0]), 0);
+ assertEquals(PTable.TransformStatus.CREATED.toString(), record.getTransformStatus());
+ assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+ }
+ }
+
+ @Test
+ public void testTransformVerify_ForceCutover() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = "IDX_" + generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
+ conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ List<String> args = getArgList(schemaName, dataTableName, indexTableName,
+ null, null, null, false, false, false, false, false);
+ args.add("-fco");
+ runTransformTool(args.toArray(new String[0]), 0);
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertTransformStatusOrPartial(PTable.TransformStatus.COMPLETED, record);
+ PTable pOldIndexTable = PhoenixRuntime.getTableNoCache(conn, indexTableFullName);
+ assertEquals(SchemaUtil.getTableNameFromFullName(record.getNewPhysicalTableName()),
+ pOldIndexTable.getPhysicalName(true).getString());
+
+ // Now test data table
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false, false);
+ args.add("-fco");
+ runTransformTool(args.toArray(new String[0]), 0);
+
+ record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertTransformStatusOrPartial(PTable.TransformStatus.COMPLETED, record);
+ PTable pOldTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+ assertEquals(SchemaUtil.getTableNameFromFullName(record.getNewPhysicalTableName()),
+ pOldTable.getPhysicalName(true).getString());
+ }
+ }
+
+ public static void assertTransformStatusOrPartial(PTable.TransformStatus expectedStatus, SystemTransformRecord systemTransformRecord) {
+ if (systemTransformRecord.getTransformStatus().equals(expectedStatus.name())) {
+ return;
+ }
+
+ assertEquals(true, systemTransformRecord.getTransformType().toString().contains("PARTIAL"));
+ }
+
public static List<String> getArgList(String schemaName, String dataTable, String indxTable, String tenantId,
Long startTime, Long endTime,
boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 35d69aa..c5899e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -787,6 +787,7 @@ public class FromCompiler {
tableNode.getName().equals(mutatingTableName)) {
alwaysHitServer = true;
}
+
try {
MetaDataMutationResult result = client.updateCache(tenantId, schemaName, tableName, alwaysHitServer);
timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
@@ -816,8 +817,8 @@ public class FromCompiler {
MetaDataMutationResult result = client.updateCache(schemaName, tableName);
if (result.wasUpdated()) {
timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
- theTable = result.getTable();
}
+ theTable = result.getTable();
}
if (theTable == null) {
throw new TableNotFoundException(schemaName, tableName, timeStamp);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index a5cd7b4..59961af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -235,8 +235,6 @@ import org.apache.phoenix.schema.metrics.MetricsMetadataSource;
import org.apache.phoenix.schema.metrics.MetricsMetadataSourceFactory;
import org.apache.phoenix.schema.task.SystemTaskParams;
import org.apache.phoenix.schema.task.Task;
-import org.apache.phoenix.schema.transform.SystemTransformRecord;
-import org.apache.phoenix.schema.transform.Transform;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 4687fe1..f5ae114 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -72,6 +72,7 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
private static Map<TaskType, String> classMap = ImmutableMap.<TaskType, String>builder()
.put(TaskType.DROP_CHILD_VIEWS, "org.apache.phoenix.coprocessor.tasks.DropChildViewsTask")
.put(TaskType.INDEX_REBUILD, "org.apache.phoenix.coprocessor.tasks.IndexRebuildTask")
+ .put(TaskType.TRANSFORM_MONITOR, "org.apache.phoenix.coprocessor.tasks.TransformMonitorTask")
.build();
public enum TaskResultCode {
@@ -186,17 +187,20 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
// if current status is already Started, check if we need to re-run.
// Task can be async and already Started before.
TaskResult result = null;
- if (taskRecord.getStatus() != null && taskRecord.getStatus().equals(PTable.TaskStatus.STARTED.toString())) {
+ if (taskRecord.getStatus() != null
+ && taskRecord.getStatus().equals(PTable.TaskStatus.STARTED.toString())) {
result = (TaskResult) checkCurretResult.invoke(obj, taskRecord);
}
- if (result == null) {
+ if (result == null || taskRecord.getStatus().equals(PTable.TaskStatus.RETRY.toString())) {
// reread task record. There might be async setting of task status
taskRecord =
Task.queryTaskTable(connForTask, taskRecord.getTimeStamp(),
taskRecord.getSchemaName(), taskRecord.getTableName(),
taskType, taskRecord.getTenantId(), null).get(0);
- if (taskRecord.getStatus() != null && !taskRecord.getStatus().equals(PTable.TaskStatus.CREATED.toString())) {
+ if (taskRecord.getStatus() != null
+ && !taskRecord.getStatus().equals(PTable.TaskStatus.CREATED.toString())
+ && !taskRecord.getStatus().equals(PTable.TaskStatus.RETRY.toString())) {
continue;
}
@@ -277,7 +281,7 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
.setPriority(taskRecord.getPriority())
.setStartTs(taskRecord.getTimeStamp())
.setEndTs(endTs)
- .setAccessCheckEnabled(true)
+ .setAccessCheckEnabled(false)
.build());
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java
new file mode 100644
index 0000000..c71380d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_MONITOR_ENABLED;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_RETRY_COUNT;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_RETRY_COUNT_VALUE;
+
+/**
+ * Task runs periodically to monitor and orchestrate ongoing transforms in System.Transform table.
+ *
+ */
+public class TransformMonitorTask extends BaseTask {
+ public static final String DEFAULT = "IndexName";
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(TransformMonitorTask.class);
+
+ private static boolean isDisabled = false;
+
+ // Called from testong
+ @VisibleForTesting
+ public static void disableTransformMonitorTask(boolean disabled) {
+ isDisabled = disabled;
+ }
+
+ @Override
+ public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+ Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+ Configuration configuration = HBaseConfiguration.addHbaseResources(conf);
+ boolean transformMonitorEnabled = configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, DEFAULT_TRANSFORM_MONITOR_ENABLED);
+ if (!transformMonitorEnabled || isDisabled) {
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "TransformMonitor is disabled");
+ }
+
+ try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)){
+ SystemTransformRecord systemTransformRecord = Transform.getTransformRecord(taskRecord.getSchemaName(),
+ taskRecord.getTableName(), null, taskRecord.getTenantId(), conn);
+ if (systemTransformRecord == null) {
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+ "No transform record is found");
+ }
+ String tableName = SchemaUtil.getTableName(systemTransformRecord.getSchemaName(),
+ systemTransformRecord.getLogicalTableName());
+
+ if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.CREATED.name())) {
+ LOGGER.info("Transform is created, starting the TransformTool ", tableName);
+ // Kick a TransformTool run, it will already update transform record status and job id
+ TransformTool transformTool = TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, false, false);
+ if (transformTool == null) {
+ // This is not a map/reduce error. There must be some unexpected issue. So, retrying will not solve the underlying issue.
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "TransformTool run failed. Check the parameters.");
+ }
+ } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name())) {
+ LOGGER.info("Transform is completed, TransformMonitor is done ", tableName);
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+ } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
+ && !PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType())) {
+ LOGGER.info("Transform is pending cutover ", tableName);
+ Transform.doCutover(conn, systemTransformRecord);
+
+ PTable.TransformType partialTransform = PTable.TransformType.getPartialTransform(systemTransformRecord.getTransformType());
+ if (partialTransform != null) {
+ // Update transform to be partial
+ SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(systemTransformRecord);
+ builder.setTransformType(partialTransform);
+ // Decrement retry count since TransformTool will increment it. Should we set it to 0?
+ builder.setTransformRetryCount(systemTransformRecord.getTransformRetryCount()-1);
+ Transform.upsertTransform(builder.build(), conn);
+
+ // Fix unverified rows. Running partial transform will make the transform status go back to started
+ long startFromTs = 0;
+ if (systemTransformRecord.getTransformLastStateTs() != null) {
+ startFromTs = systemTransformRecord.getTransformLastStateTs().getTime()-1;
+ }
+ TransformTool.runTransformTool(systemTransformRecord, conf, true, startFromTs, null, true, false);
+
+ // In the future, if we are changing the PK structure, we need to run indextools as well
+ } else {
+ // No partial transform needed so, we update state of the transform
+ LOGGER.warn("No partial type of the transform is found. Completing the transform ", tableName);
+ Transform.updateTransformRecord(conn, systemTransformRecord, PTable.TransformStatus.COMPLETED);
+ }
+ } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name()) ||
+ (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
+ && PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType()))) {
+ LOGGER.info(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name()) ?
+ "Transform is started, we will monitor ": "Partial transform is going on, we will monitor" , tableName);
+ // Monitor the job of transform tool and decide to retry
+ String jobId = systemTransformRecord.getTransformJobId();
+ if (jobId != null) {
+ Cluster cluster = new Cluster(configuration);
+
+ Job job = cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobId));
+ if (job == null) {
+ LOGGER.warn(String.format("Transform job with Id=%s is not found", jobId));
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "The job cannot be found");
+ }
+ if (job != null && job.isComplete()) {
+ if (job.isSuccessful()) {
+ LOGGER.warn("TransformTool job is successful. Transform should have been in a COMPLETED state "
+ + taskRecord.getTableName());
+ } else {
+ // Retry TransformTool run
+ int maxRetryCount = configuration.getInt(TRANSFORM_RETRY_COUNT_VALUE, DEFAULT_TRANSFORM_RETRY_COUNT);
+ if (systemTransformRecord.getTransformRetryCount() < maxRetryCount) {
+ // Retry count will be incremented in TransformTool
+ TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, false, true);
+ }
+ }
+ }
+ }
+ } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.FAILED.name())) {
+ String str = "Transform is marked as failed because either TransformTool is run on the foreground and failed " +
+ "or it is run as async but there is something wrong with the TransformTool parameters";
+ LOGGER.error(str);
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+ } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PAUSED.name())) {
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS,
+ "Transform is paused. No need to monitor");
+ } else {
+ String str = "Transform status is not known " + systemTransformRecord.getString();
+ LOGGER.error(str);
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+ }
+
+ // Update task status to RETRY so that it is retried
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(conn)
+ .setTaskType(taskRecord.getTaskType())
+ .setTenantId(taskRecord.getTenantId())
+ .setSchemaName(taskRecord.getSchemaName())
+ .setTableName(taskRecord.getTableName())
+ .setTaskStatus(PTable.TaskStatus.RETRY.toString())
+ .setData(taskRecord.getData())
+ .setPriority(taskRecord.getPriority())
+ .setStartTs(taskRecord.getTimeStamp())
+ .setEndTs(null)
+ .setAccessCheckEnabled(true)
+ .build());
+ return null;
+ }
+ catch (Throwable t) {
+ LOGGER.warn("Exception while running transform monitor task. " +
+ "It will be retried in the next system task table scan : " +
+ taskRecord.getSchemaName() + "." + taskRecord.getTableName() +
+ " with tenant id " + (taskRecord.getTenantId() == null ? " IS NULL" : taskRecord.getTenantId()) +
+ " and data " + taskRecord.getData(), t);
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, t.toString());
+ }
+ }
+
+ public static void addTransformMonitorTask(PhoenixConnection connection, Configuration configuration, SystemTransformRecord systemTransformRecord,
+ PTable.TaskStatus taskStatus, Timestamp startTimestamp, Timestamp endTimestamp) throws IOException {
+ boolean transformMonitorEnabled = configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, DEFAULT_TRANSFORM_MONITOR_ENABLED);
+ if (!transformMonitorEnabled) {
+ LOGGER.warn("TransformMonitor is not enabled. Monitoring/retrying TransformTool and doing cutover will not be done automatically");
+ return;
+ }
+
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(connection)
+ .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+ .setTenantId(systemTransformRecord.getTenantId())
+ .setSchemaName(systemTransformRecord.getSchemaName())
+ .setTableName(systemTransformRecord.getLogicalTableName())
+ .setTaskStatus(taskStatus.toString())
+ .setStartTs(startTimestamp)
+ .setEndTs(endTimestamp)
+ .setAccessCheckEnabled(true)
+ .build());
+ }
+
+ @Override
+ public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord taskRecord)
+ throws Exception {
+ // We don't need to check MR job result here since the job itself changes task state.
+ return null;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 72303a3..f65db43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -566,7 +566,14 @@ public enum SQLExceptionCode {
CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE(910, "43M21",
"Cannot transform an index or a table who is already going through a transform."),
- CANNOT_TRANSFORM_VIEW_INDEX(911, "43M22", "Cannot transform a view index. Consider creating a new view index.");
+
+ CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX(911, "43M22", "Cannot transform a view index or a local index. For view index, consider creating a new view index."),
+
+ CANNOT_TRANSFORM_TABLE_WITH_LOCAL_INDEX(912, "43M23", "Cannot transform a table with a local index."),
+
+ CANNOT_TRANSFORM_TABLE_WITH_APPEND_ONLY_SCHEMA(913, "43M24", "Cannot transform a table with append-only schema."),
+
+ CANNOT_TRANSFORM_TRANSACTIONAL_TABLE(914, "43M25", "Cannot transform a transactional table.");
private final int errorCode;
private final String sqlState;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
index cfa027f..b6c41b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
@@ -25,9 +25,13 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.visitor.ExpressionVisitor;
import org.apache.phoenix.schema.PDatum;
import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE;
/**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index fbb5182..a465d98 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1058,15 +1058,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, ts);
return buildUpdateMutation(kvBuilder, valueGetter, dataRowKeyPtr, ts, regionStartKey, regionEndKey,
indexRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
- indexEmptyKeyValueRef, indexWALDisabled, dataImmutableStorageScheme, immutableStorageScheme, encodingScheme, verified);
+ indexEmptyKeyValueRef, indexWALDisabled, dataImmutableStorageScheme, immutableStorageScheme, encodingScheme, dataEncodingScheme, verified);
}
public static Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts,
byte[] regionStartKey, byte[] regionEndKey, byte[] destRowKey, ImmutableBytesPtr emptyKeyValueCFPtr,
Map<ColumnReference, ColumnReference> coveredColumnsMap,
ColumnReference destEmptyKeyValueRef, boolean destWALDisabled,
- ImmutableStorageScheme srcImmutableStroageScheme, ImmutableStorageScheme destImmutableStorageScheme,
- QualifierEncodingScheme destEncodingScheme, boolean verified) throws IOException {
+ ImmutableStorageScheme srcImmutableStorageScheme, ImmutableStorageScheme destImmutableStorageScheme,
+ QualifierEncodingScheme destEncodingScheme, QualifierEncodingScheme srcEncodingScheme, boolean verified) throws IOException {
Set<ColumnReference> coveredColumns = coveredColumnsMap.keySet();
Put put = null;
// New row being inserted: add the empty key value
@@ -1115,7 +1115,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
ColumnReference indexColRef = colRefPair.getFirst();
ColumnReference dataColRef = colRefPair.getSecond();
byte[] value = null;
- if (srcImmutableStroageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ if (srcImmutableStorageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
Expression expression = new SingleCellColumnExpression(new PDatum() {
@Override public boolean isNullable() {
return false;
@@ -1168,17 +1168,81 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));
}
} else {
- for (ColumnReference ref : coveredColumns) {
- ColumnReference indexColRef = coveredColumnsMap.get(ref);
- ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
- ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
- ImmutableBytesWritable value = valueGetter.getLatestValue(ref, ts);
- if (value != null && value != ValueGetter.HIDDEN_BY_DELETE) {
- if (put == null) {
- put = new Put(destRowKey);
- put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+ if (srcImmutableStorageScheme == destImmutableStorageScheme) { //both ONE_CELL
+ for (ColumnReference ref : coveredColumns) {
+ ColumnReference indexColRef = coveredColumnsMap.get(ref);
+ ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
+ ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
+ ImmutableBytesWritable value = valueGetter.getLatestValue(ref, ts);
+ if (value != null && value != ValueGetter.HIDDEN_BY_DELETE) {
+ if (put == null) {
+ put = new Put(destRowKey);
+ put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+ }
+ put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
+ }
+ }
+ } else {
+ // Src is SINGLE_CELL, destination is ONE_CELL
+ Map<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> familyToColListMap = Maps.newHashMap();
+ for (ColumnReference ref : coveredColumns) {
+ ColumnReference indexColRef = coveredColumnsMap.get(ref);
+ ImmutableBytesPtr cf = new ImmutableBytesPtr(indexColRef.getFamily());
+ if (!familyToColListMap.containsKey(cf)) {
+ familyToColListMap.put(cf, Lists.<Pair<ColumnReference, ColumnReference>>newArrayList());
+ }
+ familyToColListMap.get(cf).add(Pair.newPair(indexColRef, ref));
+ }
+ // iterate over each column family and create a byte[] containing all the columns
+ for (Entry<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> entry : familyToColListMap.entrySet()) {
+ byte[] columnFamily = entry.getKey().copyBytesIfNecessary();
+ List<Pair<ColumnReference, ColumnReference>> colRefPairs = entry.getValue();
+ int maxEncodedColumnQualifier = Integer.MIN_VALUE;
+ // find the max col qualifier
+ for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) {
+ maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, srcEncodingScheme.decode(colRefPair.getSecond().getQualifier()));
+ }
+ // set the values of the columns
+ for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) {
+ ColumnReference indexColRef = colRefPair.getFirst();
+ ColumnReference dataColRef = colRefPair.getSecond();
+ byte[] valueBytes = null;
+ Expression expression = new SingleCellColumnExpression(new PDatum() {
+ @Override public boolean isNullable() {
+ return false;
+ }
+
+ @Override public SortOrder getSortOrder() {
+ return null;
+ }
+
+ @Override public Integer getScale() {
+ return null;
+ }
+
+ @Override public Integer getMaxLength() {
+ return null;
+ }
+
+ @Override public PDataType getDataType() {
+ return null;
+ }
+ }, dataColRef.getFamily(), dataColRef.getQualifier(), srcEncodingScheme,
+ srcImmutableStorageScheme);
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr);
+ valueBytes = ptr.copyBytesIfNecessary();
+
+ if (valueBytes != null) {
+ ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
+ ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
+ if (put == null) {
+ put = new Put(destRowKey);
+ put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+ }
+ put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, new ImmutableBytesWritable(valueBytes)));
+ }
}
- put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d74fe6d..5c8317d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -635,7 +635,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
}
public PTable getTable(PTableKey key) throws TableNotFoundException {
- return metaData.getTableRef(key).getTable();
+ PTable table = metaData.getTableRef(key).getTable();
+ // Force TableNotFoundException for the table that is going through transform
+ if (table.getTransformingNewTable() != null) {
+ throw new TableNotFoundException("Re-read the transforming table", true);
+ }
+ return table;
}
public PTableRef getTableRef(PTableKey key) throws TableNotFoundException {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 0e4bc77..28e819b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -221,7 +221,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String TRANSFORM_JOB_ID = "JOB_ID";
public static final String TRANSFORM_RETRY_COUNT = "RETRY_COUNT";
public static final String TRANSFORM_START_TS = "START_TS";
- public static final String TRANSFORM_END_TS = "END_TS";
+ public static final String TRANSFORM_LAST_STATE_TS = "END_TS";
public static final String OLD_METADATA = "OLD_METADATA";
public static final String NEW_METADATA = "NEW_METADATA";
public static final String TRANSFORM_FUNCTION = "TRANSFORM_FUNCTION";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
index a0cd037..e5403c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.mapreduce.index;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -37,6 +39,8 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.util.ByteUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
@@ -47,6 +51,7 @@ import java.util.List;
public class IndexVerificationOutputRepository implements AutoCloseable {
public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexVerificationOutputRepository.class);
private Table indexTable;
private byte[] indexName;
@@ -184,7 +189,11 @@ public class IndexVerificationOutputRepository implements AutoCloseable {
TableDescriptor tableDescriptor = TableDescriptorBuilder
.newBuilder(TableName.valueOf(OUTPUT_TABLE_NAME))
.setColumnFamily(columnDescriptor).build();
- admin.createTable(tableDescriptor);
+ try {
+ admin.createTable(tableDescriptor);
+ } catch (TableExistsException e) {
+ LOGGER.warn("Table exists, ignoring", e);
+ }
outputTable = admin.getConnection().getTable(outputTableName);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
index 9ff5ed0..7c5c92b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.mapreduce.index;
import org.apache.hadoop.hbase.Cell;
+
+import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -40,12 +42,15 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.util.ByteUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
public class IndexVerificationResultRepository implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexVerificationResultRepository.class);
public static final String RUN_STATUS_SKIPPED = "Skipped";
public static final String RUN_STATUS_EXECUTED = "Executed";
@@ -170,7 +175,11 @@ public class IndexVerificationResultRepository implements AutoCloseable {
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(resultTableName)
.setColumnFamily(columnDescriptor).build();
- admin.createTable(tableDescriptor);
+ try {
+ admin.createTable(tableDescriptor);
+ } catch (TableExistsException e) {
+ LOGGER.warn("Table exists, ignoring", e);
+ }
resultTable = admin.getConnection().getTable(resultTableName);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 3f5462c..ed01173 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.mapreduce.transform.TransformTool;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.PIndexState;
@@ -167,6 +166,10 @@ public class PhoenixIndexImportDirectReducer extends
try {
Transform.completeTransform(ConnectionUtil
.getInputConnection(context.getConfiguration()), context.getConfiguration());
+ if (PhoenixConfigurationUtil.getForceCutover(context.getConfiguration())) {
+ Transform.doForceCutover(ConnectionUtil
+ .getInputConnection(context.getConfiguration()), context.getConfiguration());
+ }
} catch (Exception e) {
LOGGER.error(" Failed to complete transform", e);
throw new RuntimeException(e.getMessage());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
index 6b5b217..bbabe88 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
@@ -19,21 +19,16 @@ package org.apache.phoenix.mapreduce.transform;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Reducer;
import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.mapreduce.index.IndexToolUtil;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectReducer;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.transform.Transform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
-import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
@@ -65,6 +60,9 @@ public class PhoenixTransformReducer extends
connection = ConnectionUtil.getInputConnection(context.getConfiguration())) {
// Complete full Transform and add a partial transform
Transform.completeTransform(connection, context.getConfiguration());
+ if (PhoenixConfigurationUtil.getForceCutover(context.getConfiguration())) {
+ Transform.doForceCutover(connection, context.getConfiguration());
+ }
} catch (Exception e) {
LOGGER.error(" Failed to complete transform", e);
throw new RuntimeException(e.getMessage());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
index b8829b6..1159e73 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -65,6 +66,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.schema.transform.SystemTransformRecord;
import org.apache.phoenix.schema.transform.Transform;
import org.apache.phoenix.schema.transform.TransformMaintainer;
@@ -92,6 +94,7 @@ import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
import static org.apache.phoenix.mapreduce.index.IndexTool.createIndexToolTables;
@@ -121,6 +124,9 @@ public class TransformTool extends Configured implements Tool {
private static final Option FIX_UNVERIFIED_TRANSFORM_OPTION = new Option("fu", "fix-unverified", false,
"To fix unverified transform records");
+ private static final Option FORCE_CUTOVER_OPTION = new Option("fco", "force-cutover", false,
+ "Updated to old table to point to new table. New table will be active and reads will start serving from the new table");
+
private static final Option USE_NEW_TABLE_AS_SOURCE_OPTION =
new Option("fn", "from-new", false,
"To verify every row in the new table has a corresponding row in the old table. ");
@@ -170,10 +176,12 @@ public class TransformTool extends Configured implements Tool {
public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial transform accepts "
+ "non-zero ts set in the past as start-time(st) option and that ts must be present in SYSTEM.TRANSFORM table";
- public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views";
+ public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views or transactional tables";
public static final String PARTIAL_TRANSFORM_NOT_COMPATIBLE = "Can't abort/pause/resume/split during partial transform";
+ public static final String FORCE_CUTOVER_NOT_COMPATIBLE = "Force cutover is not applicable with the other parameters";
+
private static final Option VERIFY_OPTION = new Option("v", "verify", true,
"To verify every data row in the old table has a corresponding row in the new table. " +
"The accepted values are NONE, ONLY, BEFORE, AFTER, and BOTH. " +
@@ -210,6 +218,7 @@ public class TransformTool extends Configured implements Tool {
private boolean isPartialTransform;
private boolean shouldFixUnverified;
private boolean shouldUseNewTableAsSource;
+ private boolean shouldForceCutover;
private Job job;
public Long getStartTime() {
@@ -257,6 +266,7 @@ public class TransformTool extends Configured implements Tool {
options.addOption(START_TIME_OPTION);
options.addOption(END_TIME_OPTION);
options.addOption(FIX_UNVERIFIED_TRANSFORM_OPTION);
+ options.addOption(FORCE_CUTOVER_OPTION);
options.addOption(USE_NEW_TABLE_AS_SOURCE_OPTION);
options.addOption(AUTO_SPLIT_OPTION);
options.addOption(ABORT_TRANSFORM_OPTION);
@@ -302,8 +312,13 @@ public class TransformTool extends Configured implements Tool {
boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
shouldFixUnverified = cmdLine.hasOption(FIX_UNVERIFIED_TRANSFORM_OPTION.getOpt());
shouldUseNewTableAsSource = cmdLine.hasOption(USE_NEW_TABLE_AS_SOURCE_OPTION.getOpt());
+ shouldForceCutover = cmdLine.hasOption(FORCE_CUTOVER_OPTION.getOpt());
basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
isPartialTransform = cmdLine.hasOption(PARTIAL_TRANSFORM_OPTION.getOpt());
+ if (shouldForceCutover) {
+ LOGGER.info("TransformTool will fix the unverified rows before cutover");
+ shouldFixUnverified = true;
+ }
if (useStartTime) {
startTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
}
@@ -325,6 +340,22 @@ public class TransformTool extends Configured implements Tool {
|| cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt()))) {
throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
}
+ if (shouldForceCutover && (isPartialTransform || useStartTime || useEndTime || shouldUseNewTableAsSource
+ || cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt()))) {
+ throw new IllegalArgumentException(FORCE_CUTOVER_NOT_COMPATIBLE);
+ }
+
+ schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+ dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+ indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+ qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+ isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+ logicalTableName = dataTable;
+ logicalParentName = null;
+ if (!Strings.isNullOrEmpty(indexTable)) {
+ logicalTableName = indexTable;
+ logicalParentName = SchemaUtil.getTableName(schemaName, dataTable);
+ }
if (isPartialTransform) {
if (!cmdLine.hasOption(START_TIME_OPTION.getOpt())) {
@@ -336,24 +367,12 @@ public class TransformTool extends Configured implements Tool {
throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
}
if (lastTransformTime == null) {
- lastTransformTime = transformRecord.getTransformEndTs().getTime();
+ lastTransformTime = transformRecord.getTransformLastStateTs().getTime();
} else {
validateLastTransformTime();
}
}
- schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
- dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
- indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
- qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
- isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
- logicalTableName = dataTable;
- logicalParentName = null;
- if (!Strings.isNullOrEmpty(indexTable)) {
- logicalTableName = indexTable;
- logicalParentName = SchemaUtil.getTableName(schemaName, dataTable);
- }
-
pDataTable = PhoenixRuntime.getTable(
connection, SchemaUtil.getQualifiedTableName(schemaName, dataTable));
if (indexTable != null) {
@@ -392,6 +411,10 @@ public class TransformTool extends Configured implements Tool {
throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
}
+ if (argPDataTable.isTransactional()) {
+ throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+ }
+
if (transformRecord == null){
throw new IllegalStateException("ALTER statement has not been run and the transform has not been created for this table");
}
@@ -463,24 +486,27 @@ public class TransformTool extends Configured implements Tool {
}
public Job configureJob() throws Exception {
- String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable, (shouldFixUnverified ? "Unverified" : "Full"));
- if (shouldUseNewTableAsSource) {
- jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable, "NewTableSource_" + pNewTable.getName());
- }
if (pNewTable.isTransactional()) {
configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
- Long.toString(TransactionUtil.convertToNanoseconds(pOldTable.getTimeStamp()+1)));
- configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
- }
- if (lastTransformTime != null) {
- PhoenixConfigurationUtil.setCurrentScnValue(configuration, lastTransformTime);
+ Long.toString(TransactionUtil.convertToNanoseconds(pOldTable.getTimeStamp() + 1)));
+ configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pNewTable.getTransactionProvider().name());
} else {
- if (endTime != null) {
- PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+ if (lastTransformTime != null) {
+ PhoenixConfigurationUtil.setCurrentScnValue(configuration, lastTransformTime);
} else {
- setCurrentScnValue(configuration, EnvironmentEdgeManager.currentTimeMillis());
+ if (endTime != null) {
+ PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+ } else {
+ setCurrentScnValue(configuration, EnvironmentEdgeManager.currentTimeMillis());
+ }
}
}
+ String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable==null?null:pNewTable.getName(),
+ (shouldFixUnverified?"Unverified":"Full"));
+ if (shouldUseNewTableAsSource) {
+ jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable==null?null:pNewTable.getName(),
+ "NewTableSource_"+pNewTable.getName());
+ }
final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
final PostIndexDDLCompiler ddlCompiler =
@@ -553,6 +579,7 @@ public class TransformTool extends Configured implements Tool {
job.setPriority(this.jobPriority);
PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
oldTableWithSchema, "");
+
if (outputPath != null) {
FileOutputFormat.setOutputPath(job, outputPath);
}
@@ -623,7 +650,7 @@ public class TransformTool extends Configured implements Tool {
return 0;
}
} catch (Exception e) {
- LOGGER.error("Caught exception " + e + " trying to run TransformTool.");
+ LOGGER.error("Caught exception " + e + " trying to run TransformTool.", e);
return 1;
}
}
@@ -696,23 +723,20 @@ public class TransformTool extends Configured implements Tool {
}
public void updateTransformRecord(PhoenixConnection connection, PTable.TransformStatus newStatus) throws Exception {
- SystemTransformRecord transformRecord = getTransformRecord(connection);
- updateTransformRecord(connection, transformRecord, newStatus);
- }
-
- public static void updateTransformRecord(PhoenixConnection connection, SystemTransformRecord transformRecord, PTable.TransformStatus newStatus) throws Exception {
- SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
- builder.setTransformStatus(newStatus.name());
- if (newStatus == PTable.TransformStatus.COMPLETED || newStatus == PTable.TransformStatus.FAILED) {
- builder.setEndTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()));
+ if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+ return;
}
- Transform.upsertTransform(builder.build(), connection.unwrap(PhoenixConnection.class));
+ SystemTransformRecord transformRecord = getTransformRecord(connection);
+ Transform.updateTransformRecord(connection, transformRecord, newStatus);
}
protected void updateTransformRecord(Job job) throws Exception {
if (job == null) {
return;
}
+ if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+ return;
+ }
SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
builder.setTransformJobId(job.getJobID().toString());
@@ -775,22 +799,48 @@ public class TransformTool extends Configured implements Tool {
}
runTransform(args, cmdLine);
+
+ // Check if we already have a TransformMonitor task. If we do, remove those and start a new monitor
+ List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(connection, null);
+ for (Task.TaskRecord taskRecord : taskRecordList) {
+ if (taskRecord.isMatchingTask(transformRecord)) {
+ Task.deleteTask(connection.unwrap(PhoenixConnection.class), PTable.TaskType.TRANSFORM_MONITOR, taskRecord.getTimeStamp(), taskRecord.getTenantId(),
+ taskRecord.getSchemaName(), taskRecord.getTableName(), configuration.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
+ QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED));
+ }
+ }
+
+ // start TransformMonitor
+ TransformMonitorTask.addTransformMonitorTask(connection.unwrap(PhoenixConnection.class), configuration, transformRecord,
+ PTable.TaskStatus.CREATED, new Timestamp(EnvironmentEdgeManager.currentTimeMillis()), null);
+
}
public int runTransform(String[] args, CommandLine cmdLine) throws Exception {
int status = 0;
+
updateTransformRecord(connection.unwrap(PhoenixConnection.class), PTable.TransformStatus.STARTED);
PhoenixConfigurationUtil.setIsPartialTransform(configuration, isPartialTransform);
PhoenixConfigurationUtil.setIsTransforming(configuration, true);
+ PhoenixConfigurationUtil.setForceCutover(configuration, shouldForceCutover);
if (!Strings.isNullOrEmpty(indexTable)) {
PhoenixConfigurationUtil.setTransformingTableType(configuration, IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
// Index table transform. Build the index
IndexTool indexTool = new IndexTool();
indexTool.setConf(configuration);
+ if (shouldForceCutover) {
+ List<String> argsList = new ArrayList<String>(Arrays.asList(args));
+ // Remove from cmdLine so that indexTool will not throw error
+ argsList.remove("-"+FORCE_CUTOVER_OPTION.getOpt());
+ argsList.remove("--"+FORCE_CUTOVER_OPTION.getLongOpt());
+ args = argsList.toArray(new String[0]);
+ }
status = indexTool.run(args);
Job job = indexTool.getJob();
- updateTransformRecord(job);
+ if (status == 0) {
+ updateTransformRecord(job);
+ }
} else {
PhoenixConfigurationUtil.setTransformingTableType(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
if (!isPartialTransform) {
@@ -798,7 +848,9 @@ public class TransformTool extends Configured implements Tool {
}
configureJob();
status = runJob();
- updateTransformRecord(this.job);
+ if (status == 0) {
+ updateTransformRecord(this.job);
+ }
}
// Record status
@@ -855,4 +907,68 @@ public class TransformTool extends Configured implements Tool {
int result = ToolRunner.run(new TransformTool(), args);
System.exit(result);
}
+
+
+ public static TransformTool runTransformTool(SystemTransformRecord systemTransformRecord, Configuration configuration,
+ boolean isPartial, Long startTime, Long endTime, boolean shouldFixUnverified, boolean doValidation) throws Exception {
+ List<String> args = Lists.newArrayList();
+ if (!Strings.isNullOrEmpty(systemTransformRecord.getSchemaName())) {
+ args.add("--schema=" + systemTransformRecord.getSchemaName());
+ }
+ String oldTableName = systemTransformRecord.getLogicalTableName();
+ boolean isIndex = false;
+ if (!Strings.isNullOrEmpty(systemTransformRecord.getLogicalParentName())) {
+ isIndex = true;
+ args.add("--index-table=" + oldTableName);
+ args.add("--data-table=" + SchemaUtil.getTableNameFromFullName(systemTransformRecord.getLogicalParentName()));
+ } else {
+ args.add("--data-table=" + oldTableName);
+ }
+
+ args.add("-op");
+ args.add("/tmp/" + UUID.randomUUID().toString());
+
+ if (!Strings.isNullOrEmpty(systemTransformRecord.getTenantId())) {
+ args.add("-tenant");
+ args.add(systemTransformRecord.getTenantId());
+ }
+ if(startTime != null) {
+ args.add("-st");
+ args.add(String.valueOf(startTime));
+ }
+ if(endTime != null) {
+ args.add("-et");
+ args.add(String.valueOf(endTime));
+ }
+ if (isPartial) {
+ if (isIndex) {
+ // args.add("-pr");
+ } else {
+ args.add("-pt");
+ }
+ }
+ if (shouldFixUnverified) {
+ if (!isIndex) {
+ args.add("-fu");
+ }
+ }
+
+ if (doValidation) {
+ args.add("-v");
+ args.add(IndexTool.IndexVerifyType.ONLY.getValue());
+ }
+ String[] cmdArgs = args.toArray(new String[0]);
+ TransformTool tt = new TransformTool();
+ Configuration conf = new Configuration(configuration);
+ tt.setConf(conf);
+
+ LOGGER.info("Running TransformTool with {}", Arrays.toString(cmdArgs), new Exception("Stack Trace"));
+ int status = tt.run(cmdArgs);
+ LOGGER.info("TransformTool with {} status is ", Arrays.toString(cmdArgs), status);
+ if (status != 0) {
+ return null;
+ }
+ return tt;
+ }
+
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 85183cf..f3a3fd8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -97,7 +97,15 @@ public final class PhoenixConfigurationUtil {
/** For local indexes which are stored in a single separate physical table*/
public static final String PHYSICAL_TABLE_NAME = "phoenix.output.table.name" ;
-
+
+ public static final String TRANSFORM_RETRY_COUNT_VALUE = "phoenix.transform.retry.count";
+
+ public static final int DEFAULT_TRANSFORM_RETRY_COUNT = 50;
+
+ public static final String TRANSFORM_MONITOR_ENABLED = "phoenix.transform.monitor.enabled";
+
+ public static final boolean DEFAULT_TRANSFORM_MONITOR_ENABLED = true;
+
public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
public static final String INPUT_CLASS = "phoenix.input.class";
@@ -209,6 +217,9 @@ public final class PhoenixConfigurationUtil {
// Is the mapreduce used for table/index transform
public static final String IS_TRANSFORMING_VALUE = "phoenix.mr.istransforming";
+ // Is force transform cutover
+ public static final String FORCE_CUTOVER_VALUE = "phoenix.mr.force.cutover";
+
// Is the mapreduce used for table/index transform
public static final String TRANSFORMING_TABLE_TYPE = "phoenix.mr.transform.tabletype";
@@ -633,6 +644,17 @@ public final class PhoenixConfigurationUtil {
return Boolean.valueOf(configuration.get(IS_TRANSFORMING_VALUE, "false"));
}
+ public static void setForceCutover(Configuration configuration, Boolean forceCutover) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(forceCutover);
+ configuration.set(FORCE_CUTOVER_VALUE, Boolean.toString(forceCutover));
+ }
+
+ public static Boolean getForceCutover(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return Boolean.valueOf(configuration.get(FORCE_CUTOVER_VALUE, "false"));
+ }
+
public static void setTransformingTableType(Configuration configuration,
SourceTable sourceTable) {
Preconditions.checkNotNull(configuration);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f340deb..ea25cff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4546,8 +4546,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// No tables exist matching "SYSTEM\..*", they are all already in "SYSTEM:.*"
if (tableNames.size() == 0) { return; }
// Try to move any remaining tables matching "SYSTEM\..*" into "SYSTEM:"
- if (tableNames.size() > 8) {
- LOGGER.warn("Expected 8 system tables but found " + tableNames.size() + ":" + tableNames);
+ if (tableNames.size() > 9) {
+ LOGGER.warn("Expected 9 system tables but found " + tableNames.size() + ":" + tableNames);
}
byte[] mappedSystemTable = SchemaUtil
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 8f11d41..cf203da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -157,7 +157,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_END_TS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_LAST_STATE_TS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_FUNCTION;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_JOB_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT;
@@ -562,20 +562,20 @@ public interface QueryConstants {
TENANT_ID + " VARCHAR NULL,\n" +
TABLE_SCHEM + " VARCHAR NULL," +
LOGICAL_TABLE_NAME + " VARCHAR NOT NULL,\n" +
- NEW_PHYS_TABLE_NAME + " VARCHAR NOT NULL,\n" +
- TRANSFORM_TYPE + " INTEGER NOT NULL," +
// Non-PK columns
+ NEW_PHYS_TABLE_NAME + " VARCHAR,\n" +
+ TRANSFORM_TYPE + " INTEGER," +
LOGICAL_PARENT_NAME + " VARCHAR NULL,\n" + // If this is an index, Logical_Parent_Name is the data table name. Index name is not unique.
TRANSFORM_STATUS + " VARCHAR NULL," +
TRANSFORM_JOB_ID + " VARCHAR NULL," +
TRANSFORM_RETRY_COUNT + " INTEGER NULL," +
TRANSFORM_START_TS + " TIMESTAMP NULL," +
- TRANSFORM_END_TS + " TIMESTAMP NULL," +
+ TRANSFORM_LAST_STATE_TS + " TIMESTAMP NULL," +
OLD_METADATA + " VARCHAR NULL,\n" +
NEW_METADATA + " VARCHAR NULL,\n" +
TRANSFORM_FUNCTION + " VARCHAR NULL\n" +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
- TENANT_ID + "," + TABLE_SCHEM + "," + LOGICAL_TABLE_NAME + "," + NEW_PHYS_TABLE_NAME + "," + TRANSFORM_TYPE + "))\n" +
+ TENANT_ID + "," + TABLE_SCHEM + "," + LOGICAL_TABLE_NAME + "))\n" +
HConstants.VERSIONS + "=%s,\n" +
ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
ColumnFamilyDescriptorBuilder.TTL + "=" + TRANSFORM_TABLE_TTL + ",\n" + // 10 days
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
index 90451af..2a21a29 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
@@ -81,9 +81,18 @@ public class ColumnMetaDataOps {
IS_ROW_TIMESTAMP +
") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ public static void addColumnMutation(PhoenixConnection connection, String tenantId, String schemaName, String tableName, PColumn column, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
+ addColumnMutationInternal(connection, tenantId, schemaName, tableName, column, parentTableName, pkName, keySeq, isSalted);
+ }
+
public static void addColumnMutation(PhoenixConnection connection, String schemaName, String tableName, PColumn column, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
+ addColumnMutationInternal(connection, connection.getTenantId() == null ? null : connection.getTenantId().getString()
+ , schemaName, tableName, column, parentTableName, pkName, keySeq, isSalted);
+ }
+
+ private static void addColumnMutationInternal(PhoenixConnection connection, String tenantId, String schemaName, String tableName, PColumn column, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
try (PreparedStatement colUpsert = connection.prepareStatement(UPSERT_COLUMN)) {
- colUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+ colUpsert.setString(1, tenantId);
colUpsert.setString(2, schemaName);
colUpsert.setString(3, tableName);
colUpsert.setString(4, column.getName().getString());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/IndexNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/IndexNotFoundException.java
index f7f0727..953933e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/IndexNotFoundException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/IndexNotFoundException.java
@@ -38,6 +38,6 @@ public class IndexNotFoundException extends TableNotFoundException {
}
public IndexNotFoundException(String schemaName, String tableName, long timestamp) {
- super(schemaName, tableName, timestamp, code);
+ super(schemaName, tableName, timestamp, code, false);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index cd9d052..c696bb4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -25,6 +25,9 @@ import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLi
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.INDEX_NAME;
import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.REBUILD_ALL;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX;
+import static org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
+import static org.apache.phoenix.exception.SQLExceptionCode.PARENT_TABLE_NOT_FOUND;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
@@ -2398,11 +2401,15 @@ public class MetaDataClient {
linkStatement.setString(4, physicalName.getString());
linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue());
if (tableType == PTableType.VIEW) {
- PTable logicalTable = connection.getTable(new PTableKey(null, physicalName.getString()
- .replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR)));
+ if (parent.getType() == PTableType.TABLE) {
+ linkStatement.setString(4, SchemaUtil.getTableName(parent.getSchemaName().getString(),parent.getTableName().getString()));
+ linkStatement.setLong(6, parent.getSequenceNumber());
+ } else { //This is a grandchild view, find the physical base table
+ PTable logicalTable = connection.getTable(new PTableKey(null, SchemaUtil.replaceNamespaceSeparator(physicalName)));
+ linkStatement.setString(4, SchemaUtil.getTableName(logicalTable.getSchemaName().getString(),logicalTable.getTableName().getString()));
+ linkStatement.setLong(6, logicalTable.getSequenceNumber());
+ }
// Set link to logical name
- linkStatement.setString(4, SchemaUtil.getTableName(logicalTable.getSchemaName().getString(),logicalTable.getTableName().getString()));
- linkStatement.setLong(6, logicalTable.getSequenceNumber());
linkStatement.setString(7, null);
} else {
linkStatement.setLong(6, parent.getSequenceNumber());
@@ -3367,7 +3374,7 @@ public class MetaDataClient {
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
}
} catch (TableNotFoundException e) {
- if (!ifExists) {
+ if (!ifExists && !e.isThrownToForceReReadForTransformingTable()) {
if (tableType == PTableType.INDEX)
throw new IndexNotFoundException(e.getSchemaName(),
e.getTableName(), e.getTimeStamp());
@@ -3866,6 +3873,21 @@ public class MetaDataClient {
changingPhoenixTableProperty = evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName);
boolean isTransformNeeded = Transform.checkIsTransformNeeded(metaProperties, schemaName, table, tableName, null, tenantIdToUse, connection);
+ if (isTransformNeeded) {
+ // We can add a support for these later. For now, not supported.
+ if (MetaDataUtil.hasLocalIndexTable(connection, physicalTableName.getBytes())) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_TABLE_WITH_LOCAL_INDEX)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ }
+ if (table.isAppendOnlySchema()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_TABLE_WITH_APPEND_ONLY_SCHEMA)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ }
+ if (table.isTransactional()) {
+ throw new SQLExceptionInfo.Builder(CANNOT_TRANSFORM_TRANSACTIONAL_TABLE)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ }
+ }
// If changing isImmutableRows to true or it's not being changed and is already true
boolean willBeImmutableRows = Boolean.TRUE.equals(metaPropertiesEvaluated.getIsImmutableRows()) || (metaPropertiesEvaluated.getIsImmutableRows() == null && table.isImmutableRows());
@@ -4801,7 +4823,7 @@ public class MetaDataClient {
if (isTransformNeeded) {
if (indexRef.getTable().getViewIndexId() != null) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_VIEW_INDEX)
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX)
.setSchemaName(schemaName).setTableName(indexName).build().buildException();
}
try {
@@ -5151,6 +5173,17 @@ public class MetaDataClient {
throws SQLException {
boolean changingPhoenixTableProperty = false;
+ if (metaProperties.getImmutableRowsProp() != null) {
+ if (metaProperties.getImmutableRowsProp().booleanValue() != table.isImmutableRows()) {
+ if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY)
+ .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+ }
+ metaPropertiesEvaluated.setIsImmutableRows(metaProperties.getImmutableRowsProp());
+ changingPhoenixTableProperty = true;
+ }
+ }
+
if (metaProperties.getImmutableRowsProp() != null && table.getType() != INDEX) {
if (metaProperties.getImmutableRowsProp().booleanValue() != table.isImmutableRows()) {
metaPropertiesEvaluated.setIsImmutableRows(metaProperties.getImmutableRowsProp());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index d43d2aa..be5971a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -20,11 +20,11 @@ package org.apache.phoenix.schema;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
import java.sql.SQLException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
@@ -154,7 +154,8 @@ public class PMetaDataImpl implements PMetaData {
for (PTable index : table.getIndexes()) {
metaData.put(index.getKey(), tableRefFactory.makePTableRef(index, this.timeKeeper.getCurrentTime(), resolvedTime));
}
- if (table.getPhysicalName(true) != null && !table.getPhysicalName(true).getString().equals(table.getTableName().getString())) {
+ if (table.getPhysicalName(true) != null &&
+ !Strings.isNullOrEmpty(table.getPhysicalName(true).getString()) && !table.getPhysicalName(true).getString().equals(table.getTableName().getString())) {
String physicalTableName = table.getPhysicalName(true).getString().replace(
QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR);
String physicalTableFullName = SchemaUtil.getTableName(table.getSchemaName() != null ? table.getSchemaName().getString() : null, physicalTableName);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 437ab61..e995e50 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL
import static org.apache.phoenix.util.EncodedColumnsUtil.isReservedColumnQualifier;
import java.io.DataOutputStream;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -201,7 +202,8 @@ public interface PTable extends PMetaDataEntity {
public enum TaskType {
DROP_CHILD_VIEWS((byte)1),
- INDEX_REBUILD((byte)2);
+ INDEX_REBUILD((byte)2),
+ TRANSFORM_MONITOR((byte)3);
private final byte[] byteValue;
private final byte serializedValue;
@@ -250,6 +252,11 @@ public interface PTable extends PMetaDataEntity {
return "FAILED";
}
},
+ RETRY {
+ public String toString() {
+ return "RETRY";
+ }
+ },
}
public enum TransformType {
@@ -280,6 +287,17 @@ public interface PTable extends PMetaDataEntity {
}
return TransformType.values()[serializedValue-1];
}
+ public static TransformType getPartialTransform(TransformType transformType) {
+ if (transformType == METADATA_TRANSFORM) {
+ return METADATA_TRANSFORM_PARTIAL;
+ }
+ return null;
+ }
+ public static boolean isPartialTransform(TransformType transformType){
+ List<PTable.TransformType> partials = new ArrayList<>();
+ partials.add(PTable.TransformType.METADATA_TRANSFORM_PARTIAL);
+ return partials.contains(transformType);
+ }
}
public enum TransformStatus {
@@ -293,6 +311,11 @@ public interface PTable extends PMetaDataEntity {
return "STARTED";
}
},
+ PENDING_CUTOVER {
+ public String toString() {
+ return "PENDING_CUTOVER";
+ }
+ },
COMPLETED {
public String toString() {
return "COMPLETED";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 03f0c11..4cfcdf4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -697,7 +697,6 @@ public class PTableImpl implements PTable {
Preconditions.checkNotNull(this.physicalNames);
//hasColumnsRequiringUpgrade and rowKeyOrderOptimizable are booleans and can never be
// null, so no need to check them
-
PName fullName = PNameFactory.newName(SchemaUtil.getTableName(
this.schemaName.getString(), this.tableName.getString()));
int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableNotFoundException.java
index ebc6b4d..48da43f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableNotFoundException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableNotFoundException.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.util.SchemaUtil;
public class TableNotFoundException extends MetaDataEntityNotFoundException {
private static final long serialVersionUID = 1L;
private static SQLExceptionCode code = SQLExceptionCode.TABLE_UNDEFINED;
+ private boolean thrownToForceReReadForTransformingTable = false;
private final long timestamp;
public TableNotFoundException(TableNotFoundException e, long timestamp) {
@@ -44,21 +45,28 @@ public class TableNotFoundException extends MetaDataEntityNotFoundException {
this(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName));
}
+ public TableNotFoundException(String tableName, boolean thrownForForce) {
+ this(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName),
+ HConstants.LATEST_TIMESTAMP, code, thrownForForce);
+ }
+
public TableNotFoundException(String schemaName, String tableName) {
this(schemaName, tableName, HConstants.LATEST_TIMESTAMP);
}
public TableNotFoundException(String schemaName, String tableName, long timestamp) {
- this(schemaName, tableName, timestamp, code);
+ this(schemaName, tableName, timestamp, code, false);
}
- public TableNotFoundException(String schemaName, String tableName, long timestamp, SQLExceptionCode code) {
+ public TableNotFoundException(String schemaName, String tableName, long timestamp, SQLExceptionCode code, boolean thrownForForceReRead) {
super(new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName).build().toString(),
code.getSQLState(), code.getErrorCode(), schemaName, tableName, null);
this.timestamp = timestamp;
+ this.thrownToForceReReadForTransformingTable = thrownForForceReRead;
}
public long getTimeStamp() {
return timestamp;
}
+ public boolean isThrownToForceReReadForTransformingTable() { return thrownToForceReReadForTransformingTable;};
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 0a14dcd..d35ebc0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -203,6 +203,20 @@ public enum TableProperty {
},
+ // Same as COLUMN_ENCODED_BYTES. If we don't have this one, isPhoenixProperty returns false.
+ ENCODING_SCHEME(PhoenixDatabaseMetaData.ENCODING_SCHEME, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
+ @Override
+ public Object getValue(Object value) {
+ return COLUMN_ENCODED_BYTES.getValue(value);
+ }
+
+ @Override
+ public Object getPTableValue(PTable table) {
+ return table.getEncodingScheme();
+ }
+
+ },
+
IMMUTABLE_STORAGE_SCHEME(PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
@Override
public ImmutableStorageScheme getValue(Object value) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
index 6a742e2..5aebb4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -17,6 +17,9 @@
*/
package org.apache.phoenix.schema.task;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Mutation;
@@ -27,7 +30,6 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
.TaskMetaDataService;
-import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcUtil;
import org.apache.hadoop.hbase.security.User;
@@ -512,5 +514,16 @@ public class Task {
this.taskType = taskType;
}
+ public boolean isMatchingTask(SystemTransformRecord transformRecord) {
+ if (getTaskType() != PTable.TaskType.TRANSFORM_MONITOR) {
+ return false;
+ }
+ if (StringUtils.equals(transformRecord.getLogicalTableName(), getTableName())
+ && StringUtils.equals(transformRecord.getTenantId(), getTenantId())
+ && StringUtils.equals(transformRecord.getSchemaName(), getSchemaName())) {
+ return true;
+ }
+ return false;
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
index dd678ab..63378bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
@@ -32,7 +32,7 @@ import java.sql.Timestamp;
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
- justification = "endTs and startTs are not used for mutation")
+ justification = "lastStateTs and startTs are not used for mutation")
public class SystemTransformRecord {
private final PTable.TransformType transformType;
private final String schemaName;
@@ -44,7 +44,7 @@ public class SystemTransformRecord {
private final String transformJobId;
private final Integer transformRetryCount;
private final Timestamp startTs;
- private final Timestamp endTs;
+ private final Timestamp lastStateTs;
private final String oldMetadata;
private final String newMetadata;
private final String transformFunction;
@@ -52,7 +52,7 @@ public class SystemTransformRecord {
public SystemTransformRecord(PTable.TransformType transformType,
String schemaName, String logicalTableName, String tenantId, String newPhysicalTableName, String logicalParentName,
String transformStatus, String transformJobId, Integer transformRetryCount, Timestamp startTs,
- Timestamp endTs, String oldMetadata, String newMetadata, String transformFunction) {
+ Timestamp lastStateTs, String oldMetadata, String newMetadata, String transformFunction) {
this.transformType = transformType;
this.schemaName = schemaName;
this.tenantId = tenantId;
@@ -63,16 +63,16 @@ public class SystemTransformRecord {
this.transformJobId = transformJobId;
this.transformRetryCount = transformRetryCount;
this.startTs = startTs;
- this.endTs = endTs;
+ this.lastStateTs = lastStateTs;
this.oldMetadata = oldMetadata;
this.newMetadata = newMetadata;
this.transformFunction = transformFunction;
}
public String getString() {
- return String.format("transformType: %s, schameName: %s, logicalTableName: %s, newPhysicalTableName: %s, logicalParentName: %s "
+ return String.format("transformType: %s, schameName: %s, logicalTableName: %s, newPhysicalTableName: %s, logicalParentName: %s, status: %s"
, String.valueOf(transformType), String.valueOf(schemaName), String.valueOf(logicalTableName), String.valueOf(newPhysicalTableName),
- String.valueOf(logicalParentName));
+ String.valueOf(logicalParentName), String.valueOf(transformStatus));
}
public PTable.TransformType getTransformType() {
@@ -115,8 +115,8 @@ public class SystemTransformRecord {
return startTs;
}
- public Timestamp getTransformEndTs() {
- return endTs;
+ public Timestamp getTransformLastStateTs() {
+ return lastStateTs;
}
public String getOldMetadata() {
@@ -129,15 +129,16 @@ public class SystemTransformRecord {
public boolean isActive() {
return (transformStatus.equals(PTable.TransformStatus.STARTED.name())
- || transformStatus.equals(PTable.TransformStatus.CREATED.name()));
+ || transformStatus.equals(PTable.TransformStatus.CREATED.name())
+ || transformStatus.equals(PTable.TransformStatus.PENDING_CUTOVER.name()));
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
- justification = "endTs and startTs are not used for mutation")
+ justification = "lastStateTs and startTs are not used for mutation")
public static class SystemTransformBuilder {
- private PTable.TransformType transformType;
+ private PTable.TransformType transformType = PTable.TransformType.METADATA_TRANSFORM;
private String schemaName;
private String tenantId;
private String logicalTableName;
@@ -147,7 +148,7 @@ public class SystemTransformRecord {
private String transformJobId;
private int transformRetryCount =0;
private Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
- private Timestamp endTs;
+ private Timestamp lastStateTs;
private String oldMetadata;
private String newMetadata;
private String transformFunction;
@@ -167,7 +168,7 @@ public class SystemTransformRecord {
this.setTransformJobId(systemTransformRecord.getTransformJobId());
this.setTransformRetryCount(systemTransformRecord.getTransformRetryCount());
this.setStartTs(systemTransformRecord.getTransformStartTs());
- this.setEndTs(systemTransformRecord.getTransformEndTs());
+ this.setLastStateTs(systemTransformRecord.getTransformLastStateTs());
this.setOldMetadata(systemTransformRecord.getOldMetadata());
this.setNewMetadata(systemTransformRecord.getNewMetadata());
this.setTransformFunction(systemTransformRecord.getTransformFunction());
@@ -233,8 +234,8 @@ public class SystemTransformRecord {
return this;
}
- public SystemTransformBuilder setEndTs(Timestamp endTs) {
- this.endTs = endTs;
+ public SystemTransformBuilder setLastStateTs(Timestamp ts) {
+ this.lastStateTs = ts;
return this;
}
@@ -244,12 +245,12 @@ public class SystemTransformRecord {
}
public SystemTransformRecord build() {
- Timestamp end = endTs;
- if (end == null && transformStatus != null && transformStatus.equals(PTable.TaskStatus.COMPLETED.toString())) {
- end = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+ Timestamp lastTs = lastStateTs;
+ if (lastTs == null && transformStatus != null && transformStatus.equals(PTable.TaskStatus.COMPLETED.toString())) {
+ lastTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
}
return new SystemTransformRecord(transformType, schemaName,
- logicalTableName, tenantId, newPhysicalTableName, logicalParentName, transformStatus, transformJobId, transformRetryCount, startTs, end,
+ logicalTableName, tenantId, newPhysicalTableName, logicalParentName, transformStatus, transformJobId, transformRetryCount, startTs, lastTs,
oldMetadata, newMetadata, transformFunction);
}
@@ -266,7 +267,7 @@ public class SystemTransformRecord {
builder.setTransformJobId(resultSet.getString(col++));
builder.setTransformRetryCount(resultSet.getInt(col++));
builder.setStartTs(resultSet.getTimestamp(col++));
- builder.setEndTs(resultSet.getTimestamp(col++));
+ builder.setLastStateTs(resultSet.getTimestamp(col++));
builder.setOldMetadata(resultSet.getString(col++));
builder.setNewMetadata(resultSet.getString(col++));
builder.setTransformFunction(resultSet.getString(col++));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index bc396e5..8aecf1d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -21,15 +21,17 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.TableInfo;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
-import org.apache.phoenix.mapreduce.transform.TransformTool;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
@@ -40,12 +42,13 @@ import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.JacksonUtil;
import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TableViewFinderResult;
+import org.apache.phoenix.util.UpgradeUtil;
import org.apache.phoenix.util.ViewUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,16 +59,45 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_TABLE_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_STATUS;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
import static org.apache.phoenix.schema.MetaDataClient.CREATE_LINK;
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.schema.PTableType.VIEW;
public class Transform {
private static final Logger LOGGER = LoggerFactory.getLogger(Transform.class);
+ private static final String TRANSFORM_SELECT = "SELECT " +
+ PhoenixDatabaseMetaData.TENANT_ID + ", " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+ PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
+ PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
+ PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
+ TRANSFORM_STATUS + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_LAST_STATE_TS + ", " +
+ PhoenixDatabaseMetaData.OLD_METADATA + " , " +
+ PhoenixDatabaseMetaData.NEW_METADATA + " , " +
+ PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
+ " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME;
private static String generateNewTableName(String schema, String logicalTableName, long seqNum) {
// TODO: Support schema versioning as well.
@@ -176,10 +208,13 @@ public class Transform {
TableViewFinderResult childViewsResult = ViewUtil.findChildViews(connection, systemTransformParams.getTenantId()
, systemTransformParams.getSchemaName(), systemTransformParams.getLogicalTableName());
for (TableInfo view : childViewsResult.getLinks()) {
- addTransformTableLink(connection, view.getTenantId()==null? null: Bytes.toString(view.getTenantId()),
+ addTransformTableLink(connection, view.getTenantId()==null? null:Bytes.toString(view.getTenantId()),
(view.getSchemaName()==null? null: Bytes.toString(view.getSchemaName())), Bytes.toString(view.getTableName())
, newTableName, sequenceNum);
}
+ // add a monitoring task
+ TransformMonitorTask.addTransformMonitorTask(connection, connection.getQueryServices().getConfiguration(), systemTransformParams,
+ PTable.TaskStatus.CREATED, new Timestamp(EnvironmentEdgeManager.currentTimeMillis()), null);
return newTable;
}
@@ -197,23 +232,40 @@ public class Transform {
linkStatement.execute();
}
- public static SystemTransformRecord getTransformRecord(Configuration conf, PTableType tableType, PName schemaName,
+ public static PTable getTransformingNewTable(PhoenixConnection connection, PTable oldTable) throws SQLException{
+ SystemTransformRecord transformRecord = Transform.getTransformRecord(connection, oldTable.getType(), oldTable.getSchemaName()
+ , oldTable.getTableName(), oldTable.getType()==INDEX? oldTable.getParentTableName():null, oldTable.getTenantId()
+ , oldTable.getBaseTableLogicalName());
+
+ PTable transformingNewTable = null;
+ if (transformRecord != null && transformRecord.isActive()) {
+ // New table will behave like an index
+ PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName()));
+ if (!newTableNameWithoutSchema.equals(oldTable.getPhysicalName(true))) {
+ transformingNewTable = PhoenixRuntime.getTableNoCache(connection, transformRecord.getNewPhysicalTableName());
+ }
+ }
+ return transformingNewTable;
+ }
+
+ private static SystemTransformRecord getTransformRecord(PhoenixConnection connection, PTableType tableType, PName schemaName,
PName tableName, PName dataTableName, PName tenantId,
PName parentLogicalName) throws SQLException {
+
if (tableType == PTableType.TABLE) {
- try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
- return Transform.getTransformRecord(schemaName, tableName, null, tenantId, connection);
- }
+ return Transform.getTransformRecord(schemaName, tableName, null, tenantId, connection);
+
} else if (tableType == INDEX) {
- try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
- return Transform.getTransformRecord(schemaName, tableName, dataTableName, tenantId, connection);
- }
+ return Transform.getTransformRecord(schemaName, tableName, dataTableName, tenantId, connection);
} else if (tableType == VIEW) {
- try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
- return Transform.getTransformRecord(SchemaUtil.getSchemaNameFromFullName(parentLogicalName.getString()),
- SchemaUtil.getTableNameFromFullName(parentLogicalName.getString()), null, tenantId==null? null:tenantId.getString(), connection);
+ if (parentLogicalName == null) {
+ LOGGER.warn("View doesn't seem to have a parent");
+ return null;
}
+ return Transform.getTransformRecord(SchemaUtil.getSchemaNameFromFullName(parentLogicalName.getString()),
+ SchemaUtil.getTableNameFromFullName(parentLogicalName.getString()), null, tenantId == null ? null : tenantId.getString(), connection);
}
+
return null;
}
@@ -232,22 +284,11 @@ public class Transform {
public static SystemTransformRecord getTransformRecordFromDB(
String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
- String sql = "SELECT " +
- PhoenixDatabaseMetaData.TENANT_ID + ", " +
- PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
- PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
- PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
- PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
- PhoenixDatabaseMetaData.OLD_METADATA + " , " +
- PhoenixDatabaseMetaData.NEW_METADATA + " , " +
- PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
- " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE " +
+ if (SYSTEM_TRANSFORM_NAME.equals(SchemaUtil.getTableName(schema, logicalTableName))) {
+ // Cannot query itself
+ return null;
+ }
+ String sql = TRANSFORM_SELECT + " WHERE " +
(Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
(Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
@@ -257,6 +298,7 @@ public class Transform {
if (resultSet.next()) {
return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
}
+ LOGGER.info("Could not find System.Transform record with " + sql);
return null;
}
}
@@ -312,11 +354,11 @@ public class Transform {
PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
+ TRANSFORM_STATUS + ", " +
PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
+ PhoenixDatabaseMetaData.TRANSFORM_LAST_STATE_TS + ", " +
PhoenixDatabaseMetaData.OLD_METADATA + " , " +
PhoenixDatabaseMetaData.NEW_METADATA + " , " +
PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
@@ -353,8 +395,8 @@ public class Transform {
stmt.setTimestamp(colNum++, systemTransformParams.getTransformStartTs());
- if (systemTransformParams.getTransformEndTs() != null) {
- stmt.setTimestamp(colNum++, systemTransformParams.getTransformEndTs());
+ if (systemTransformParams.getTransformLastStateTs() != null) {
+ stmt.setTimestamp(colNum++, systemTransformParams.getTransformLastStateTs());
} else {
stmt.setNull(colNum++, Types.TIMESTAMP);
}
@@ -379,10 +421,142 @@ public class Transform {
}
}
+ public static void doCutover(PhoenixConnection connection, SystemTransformRecord systemTransformRecord) throws Exception{
+ String tenantId = systemTransformRecord.getTenantId();
+ String schema = systemTransformRecord.getSchemaName();
+ String tableName = systemTransformRecord.getLogicalTableName();
+ String newTableName = SchemaUtil.getTableNameFromFullName(systemTransformRecord.getNewPhysicalTableName());
+
+ // Calculate changed metadata
+ List<String> columnNames = new ArrayList<>();
+ List<String> columnValues = new ArrayList<>();
+
+ getMetadataDifference(connection, systemTransformRecord, columnNames, columnValues);
+ // TODO In the future, we need to handle rowkey changes and column type changes as well
+
+ String
+ changeViewStmt = "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME %s) VALUES (%s, %s, '%s' %s)";
+
+ String
+ changeTable = String.format(
+ "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, PHYSICAL_TABLE_NAME %s) VALUES (%s, %s, '%s','%s' %s)",
+ (columnNames.size() > 0? "," + String.join(",", columnNames):""),
+ (tenantId==null? null: ("'" + tenantId + "'")),
+ (schema==null ? null : ("'" + schema + "'")), tableName, newTableName,
+ (columnValues.size() > 0? "," + String.join(",", columnValues):""));
+
+ LOGGER.info("About to do cutover via " + changeTable);
+ TableViewFinderResult childViewsResult = ViewUtil.findChildViews(connection, tenantId, schema, tableName);
+ boolean wasCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+ List<TableInfo> viewsToUpdateCache = new ArrayList<>();
+ try {
+ connection.createStatement().execute(changeTable);
+
+ // Update column qualifiers
+ PTable pNewTable = PhoenixRuntime.getTable(connection, systemTransformRecord.getNewPhysicalTableName());
+ PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(schema, tableName));
+ if (pOldTable.getImmutableStorageScheme() != pNewTable.getImmutableStorageScheme() ||
+ pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+ MetaDataClient.mutateTransformProperties(connection, tenantId, schema, tableName, newTableName,
+ pNewTable.getImmutableStorageScheme(), pNewTable.getEncodingScheme());
+ // We need to update the columns's qualifiers as well
+ mutateColumns(connection.unwrap(PhoenixConnection.class), pOldTable, pNewTable);
+
+ // Also update view column qualifiers
+ for (TableInfo view : childViewsResult.getLinks()) {
+ PTable pView = PhoenixRuntime.getTable(connection, view.getTenantId()==null? null: Bytes.toString(view.getTenantId())
+ , SchemaUtil.getTableName(view.getSchemaName(), view.getTableName()));
+ mutateViewColumns(connection.unwrap(PhoenixConnection.class), pView, pNewTable);
+ }
+ }
+ connection.commit();
+
+ // We can have millions of views. We need to send it in batches
+ int maxBatchSize = connection.getQueryServices().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ int batchSize = 0;
+ for (TableInfo view : childViewsResult.getLinks()) {
+ String changeView = String.format(changeViewStmt,
+ (columnNames.size() > 0? "," + String.join(",", columnNames):""),
+ (view.getTenantId()==null || view.getTenantId().length == 0? null: ("'" + Bytes.toString(view.getTenantId()) + "'")),
+ (view.getSchemaName()==null || view.getSchemaName().length == 0? null : ("'" + Bytes.toString(view.getSchemaName()) + "'")),
+ Bytes.toString(view.getTableName()),
+ (columnValues.size() > 0? "," + String.join(",", columnValues):""));
+ LOGGER.info("Cutover changing view via " + changeView);
+ connection.createStatement().execute(changeView);
+ viewsToUpdateCache.add(view);
+ batchSize++;
+ if (batchSize >= maxBatchSize) {
+ connection.commit();
+ batchSize = 0;
+ }
+ }
+ if (batchSize > 0) {
+ connection.commit();
+ batchSize = 0;
+ }
+
+ connection.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+ UpgradeUtil.clearCacheAndGetNewTable(connection.unwrap(PhoenixConnection.class),
+ connection.getTenantId(),
+ schema, tableName, systemTransformRecord.getLogicalParentName(), MIN_TABLE_TIMESTAMP);
+ for (TableInfo view : viewsToUpdateCache) {
+ UpgradeUtil.clearCache(connection.unwrap(PhoenixConnection.class),
+ PNameFactory.newName(view.getTenantId()),
+ PNameFactory.newName(view.getSchemaName()).getString(), Bytes.toString(view.getTableName()),
+ tableName, MIN_TABLE_TIMESTAMP);
+ }
+
+ // TODO: Cleanup syscat so that we don't have an extra index
+ } catch (Exception e) {
+ LOGGER.error("Error happened during cutover ", e);
+ connection.rollback();
+ throw e;
+ } finally {
+ connection.setAutoCommit(wasCommit);
+ }
+ }
+
+ private static void getMetadataDifference(PhoenixConnection connection, SystemTransformRecord systemTransformRecord, List<String> columnNames, List<String> columnValues) throws SQLException {
+ PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getQualifiedTableName(systemTransformRecord.getSchemaName(),systemTransformRecord.getLogicalTableName()));
+ PTable pNewTable = PhoenixRuntime.getTable(connection, SchemaUtil.getQualifiedTableName(SchemaUtil.getSchemaNameFromFullName(systemTransformRecord.getNewPhysicalTableName()),
+ SchemaUtil.getTableNameFromFullName(systemTransformRecord.getNewPhysicalTableName())));
+
+ Map<String, String> map = pOldTable.getPropertyValues();
+ for(Map.Entry<String, String> entry : map.entrySet()) {
+ String oldKey = entry.getKey();
+ String oldValue = entry.getValue();
+ if (pNewTable.getPropertyValues().containsKey(oldKey)) {
+ if (PHYSICAL_TABLE_NAME.equals(oldKey)) {
+ // No need to add here. We will add it.
+ continue;
+ }
+ String newValue = pNewTable.getPropertyValues().get(oldKey);
+ if (!Strings.nullToEmpty(oldValue).equals(Strings.nullToEmpty(newValue))) {
+ columnNames.add(oldKey);
+ // properties value that corresponds to a number will not need single quotes around it
+ // properties value that corresponds to a boolean value will not need single quotes around it
+ if (!Strings.isNullOrEmpty(newValue)) {
+ if(!(StringUtils.isNumeric(newValue)) &&
+ !(newValue.equalsIgnoreCase(Boolean.TRUE.toString()) ||newValue.equalsIgnoreCase(Boolean.FALSE.toString()))) {
+ if (ENCODING_SCHEME.equals(oldKey)) {
+ newValue = String.valueOf(PTable.QualifierEncodingScheme.valueOf(newValue).getSerializedMetadataValue());
+ } else if (IMMUTABLE_STORAGE_SCHEME.equals(oldKey)) {
+ newValue = String.valueOf(PTable.ImmutableStorageScheme.valueOf(newValue).getSerializedMetadataValue());
+ }
+ else {
+ newValue = "'" + newValue + "'";
+ }
+ }
+ }
+ columnValues.add(newValue);
+ }
+ }
+ }
+ }
public static void completeTransform(Connection connection, Configuration configuration) throws Exception{
// Will be called from Reducer
- long timestamp= EnvironmentEdgeManager.currentTimeMillis();
String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
@@ -390,60 +564,122 @@ public class Transform {
String indexTableName = SchemaUtil.getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
String logicaTableName = oldTableLogicalName;
String logicalParentName = null;
- if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE)
+ if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE) {
if (!Strings.isNullOrEmpty(indexTableName)) {
logicaTableName = indexTableName;
logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
}
- boolean isPartial = PhoenixConfigurationUtil.getIsPartialTransform(configuration);
+ }
+
SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName, logicalParentName,
tenantId, connection.unwrap(PhoenixConnection.class));
- if (!isPartial) {
- String newTableName = SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
- PTable pNewTable = PhoenixRuntime.getTable(connection, transformRecord.getNewPhysicalTableName());
- PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(schemaName,logicaTableName));
- if (pOldTable.getImmutableStorageScheme() != pNewTable.getImmutableStorageScheme() ||
- pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
- MetaDataClient.mutateTransformProperties(connection, tenantId, schemaName, logicaTableName, newTableName,
- pNewTable.getImmutableStorageScheme(), pNewTable.getEncodingScheme());
- // We need to update the columns's qualifiers as well
- if (pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
- Short nextKeySeq = 0;
- for (PColumn newCol : pNewTable.getColumns()) {
- boolean isPk = SchemaUtil.isPKColumn(newCol);
- Short keySeq = isPk ? ++nextKeySeq : null;
- PColumn column = new PColumnImpl(newCol.getName(), newCol.getFamilyName(), newCol.getDataType(),
- newCol.getMaxLength(), newCol.getScale(), newCol.isNullable(), newCol.getPosition(), newCol.getSortOrder()
- , newCol.getArraySize(),
- newCol.getViewConstant(), newCol.isViewReferenced(), newCol.getExpressionStr(), newCol.isRowTimestamp(),
- newCol.isDynamic(), newCol.getColumnQualifierBytes(), EnvironmentEdgeManager.currentTimeMillis());
- addColumnMutation(connection.unwrap(PhoenixConnection.class), schemaName, logicaTableName, column,
- pNewTable.getParentTableName()==null? null:pNewTable.getParentTableName().getString()
- , pNewTable.getPKName()==null? null:pNewTable.getPKName().getString(), keySeq , pNewTable.getBucketNum() != null);
- }
- }
- }
- // Clear cache so that the new table is used for queries
- connection.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
- TransformTool.updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
-
- // TODO Kick partial transform from the TransformMonitor
- SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
- builder.setTransformStatus(PTable.TransformStatus.CREATED.name());
- builder.setTransformJobId(null);
- builder.setStartTs(new Timestamp(timestamp));
- builder.setTransformRetryCount(0);
- builder.setTransformType(PTable.TransformType.METADATA_TRANSFORM_PARTIAL);
- SystemTransformRecord partialStr = builder.build();
- Transform.upsertTransform(partialStr, connection.unwrap(PhoenixConnection.class));
+
+ if (!PTable.TransformType.isPartialTransform(transformRecord.getTransformType())) {
+ updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.PENDING_CUTOVER);
connection.commit();
} else {
- TransformTool.updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
+ updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
connection.commit();
- // TODO: cleanup
}
}
+ public static void updateTransformRecord(PhoenixConnection connection, SystemTransformRecord transformRecord, PTable.TransformStatus newStatus) throws SQLException {
+ SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
+ builder.setTransformStatus(newStatus.name());
+ builder.setLastStateTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()));
+ if (newStatus == PTable.TransformStatus.STARTED) {
+ builder.setTransformRetryCount(transformRecord.getTransformRetryCount() + 1);
+ }
+ Transform.upsertTransform(builder.build(), connection);
+ }
+
+ private static void mutateColumns(PhoenixConnection connection, PTable pOldTable, PTable pNewTable) throws SQLException {
+ if (pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+ Short nextKeySeq = 0;
+ for (PColumn column : pNewTable.getColumns()) {
+ boolean isPk = SchemaUtil.isPKColumn(column);
+ Short keySeq = isPk ? ++nextKeySeq : null;
+ PColumn newCol = new PColumnImpl(column.getName(), column.getFamilyName(), column.getDataType(),
+ column.getMaxLength(), column.getScale(), column.isNullable(), column.getPosition(), column.getSortOrder()
+ , column.getArraySize(),
+ column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(),
+ column.isDynamic(), column.getColumnQualifierBytes(), EnvironmentEdgeManager.currentTimeMillis());
+ addColumnMutation(connection, pOldTable.getSchemaName()==null?null:pOldTable.getSchemaName().getString()
+ , pOldTable.getTableName().getString(), newCol,
+ pNewTable.getParentTableName() == null ? null : pNewTable.getParentTableName().getString()
+ , pNewTable.getPKName() == null ? null : pNewTable.getPKName().getString(), keySeq, pNewTable.getBucketNum() != null);
+ }
+ }
+ }
+
+ private static void mutateViewColumns(PhoenixConnection connection, PTable pView, PTable pNewTable) throws SQLException {
+ if (pView.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+ Short nextKeySeq = 0;
+ PTable.EncodedCQCounter cqCounterToUse = pNewTable.getEncodedCQCounter();
+ String defaultColumnFamily = pNewTable.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(pNewTable.getDefaultFamilyName().getString()) ?
+ pNewTable.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
+ for (PColumn column : pView.getColumns()) {
+ boolean isPk = SchemaUtil.isPKColumn(column);
+ Short keySeq = isPk ? ++nextKeySeq : null;
+ if (isPk) {
+ continue;
+ }
+ String familyName = null;
+ if (pNewTable.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ familyName = column.getFamilyName() != null ? column.getFamilyName().getString() : defaultColumnFamily;
+ } else {
+ familyName = defaultColumnFamily;
+ }
+ int encodedCQ = pView.isAppendOnlySchema() ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + keySeq) : cqCounterToUse.getNextQualifier(familyName);
+ if (!pView.isAppendOnlySchema()) {
+ cqCounterToUse.increment(familyName);
+ }
+
+ byte[] colQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(column.getName().getString(),
+ encodedCQ, pNewTable, isPk);
+
+ if (column.isDerived()) {
+ // Don't need to add/change derived columns
+ continue;
+ }
+
+ PColumn newCol = new PColumnImpl(column.getName(), PNameFactory.newName(familyName), column.getDataType(),
+ column.getMaxLength(), column.getScale(), column.isNullable(), column.getPosition(), column.getSortOrder()
+ , column.getArraySize(),
+ column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(),
+ column.isDynamic(), colQualifierBytes, EnvironmentEdgeManager.currentTimeMillis());
+ String tenantId = pView.getTenantId() == null? null:pView.getTenantId().getString();
+ addColumnMutation(connection, tenantId, pView.getSchemaName()==null?null:pView.getSchemaName().getString()
+ , pView.getTableName().getString(), newCol,
+ pView.getParentTableName() == null ? null : pView.getParentTableName().getString()
+ , pView.getPKName() == null ? null : pView.getPKName().getString(), keySeq, pView.getBucketNum() != null);
+ }
+ }
+ }
+
+ public static void doForceCutover(Connection connection, Configuration configuration) throws Exception{
+ PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
+ // Will be called from Reducer
+ String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
+ String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
+ String oldTableLogicalName = SchemaUtil.getTableNameFromFullName(fullOldTableName);
+ String indexTableName = SchemaUtil.getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
+ String logicaTableName = oldTableLogicalName;
+ String logicalParentName = null;
+ if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE)
+ if (!Strings.isNullOrEmpty(indexTableName)) {
+ logicaTableName = indexTableName;
+ logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
+ }
+
+ SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName, logicalParentName,
+ tenantId, phoenixConnection);
+ Transform.doCutover(phoenixConnection, transformRecord);
+ updateTransformRecord(phoenixConnection, transformRecord, PTable.TransformStatus.COMPLETED);
+ phoenixConnection.commit();
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index 80d32e7..da445f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
@@ -130,6 +131,10 @@ public class TransformMaintainer extends IndexMaintainer {
this.isOldTableSalted = isOldTableSalted;
}
+ public Set<ColumnReference> getAllColumns() {
+ return new HashSet<>();
+ }
+
private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) {
this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable();
@@ -140,7 +145,7 @@ public class TransformMaintainer extends IndexMaintainer {
this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
- this.newTableName = SchemaUtil.getTableName(newTable.getSchemaName(), newTable.getTableName()).getBytes();
+ this.newTableName = newTable.getPhysicalName().getBytes();
boolean newTableWALDisabled = newTable.isWALDisabled();
int nNewTableColumns = newTable.getColumns().size();
int nNewTablePKColumns = newTable.getPKColumns().size();
@@ -297,14 +302,11 @@ public class TransformMaintainer extends IndexMaintainer {
cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
builder.addOldTableColRefForCoveredColumns(cRefBuilder.build());
- if (maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS) {
- // We need to serialize the colRefs of new tables only in case of encoded column names.
- ColumnReference newTableColRef = e.getValue();
- cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
- cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
- cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
- builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
- }
+ ColumnReference newTableColRef = e.getValue();
+ cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+ cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
+ cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
+ builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
}
builder.setNewTableColumnCount(maintainer.newTableColumnCount);
@@ -383,19 +385,12 @@ public class TransformMaintainer extends IndexMaintainer {
List<ServerCachingProtos.ColumnReference> oldTableColRefsForCoveredColumnsList = proto.getOldTableColRefForCoveredColumnsList();
List<ServerCachingProtos.ColumnReference> newTableColRefsForCoveredColumnsList = proto.getNewTableColRefForCoveredColumnsList();
maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(oldTableColRefsForCoveredColumnsList.size());
- boolean encodedColumnNames = maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS;
Iterator<ServerCachingProtos.ColumnReference> newTableColRefItr = newTableColRefsForCoveredColumnsList.iterator();
for (ServerCachingProtos.ColumnReference colRefFromProto : oldTableColRefsForCoveredColumnsList) {
ColumnReference oldTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray());
ColumnReference newTableColRef;
- if (encodedColumnNames) {
- ServerCachingProtos.ColumnReference fromProto = newTableColRefItr.next();
- newTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier().toByteArray());
- } else {
- byte[] cq = oldTableColRef.getQualifier();
- byte[] cf = oldTableColRef.getFamily();
- newTableColRef = new ColumnReference(cf, cq);
- }
+ ServerCachingProtos.ColumnReference fromProto = newTableColRefItr.next();
+ newTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier().toByteArray());
maintainer.coveredColumnsMap.put(oldTableColRef, newTableColRef);
}
maintainer.logicalNewTableName = proto.getLogicalNewTableName();
@@ -460,18 +455,30 @@ public class TransformMaintainer extends IndexMaintainer {
oldTableRowKeySchema.iterator(rowKeyPtr, ptr, dataPosOffset);
// Write new table row key
+ int trailingVariableWidthColumnNum = 0;
while (oldTableRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset) != null) {
output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
if (!oldTableRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
output.writeByte(SchemaUtil.getSeparatorByte(newTableRowKeyOrderOptimizable, ptr.getLength()==0
, oldTableRowKeySchema.getField(dataPosOffset)));
+ trailingVariableWidthColumnNum++;
+ } else {
+ trailingVariableWidthColumnNum = 0;
}
+
dataPosOffset++;
}
byte[] newTableRowKey = stream.getBuffer();
// Remove trailing nulls
int length = stream.size();
+ // The existing code does not eliminate the separator if the data type is not nullable. It not clear why.
+ // The actual bug is in the calculation of maxTrailingNulls with view indexes. So, in order not to impact some other cases, we should keep minLength check here.
+ while (trailingVariableWidthColumnNum > 0 && length > 0 && newTableRowKey[length-1] == QueryConstants.SEPARATOR_BYTE) {
+ length--;
+ trailingVariableWidthColumnNum--;
+ }
+
if (isNewTableSalted) {
// Set salt byte
byte saltByte = SaltingUtil.getSaltingByte(newTableRowKey, SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES, nNewTableSaltBuckets);
@@ -483,30 +490,13 @@ public class TransformMaintainer extends IndexMaintainer {
}
}
- public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable oldRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+ public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable oldRowKeyPtr,
+ long ts, byte[] regionStartKey, byte[] regionEndKey, boolean verified) throws IOException {
byte[] newRowKey = this.buildRowKey(valueGetter, oldRowKeyPtr, regionStartKey, regionEndKey, ts);
return buildUpdateMutation(kvBuilder, valueGetter, oldRowKeyPtr, ts, regionStartKey, regionEndKey,
newRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
- newTableEmptyKeyValueRef, newTableWALDisabled, oldTableImmutableStorageScheme, newTableImmutableStorageScheme, newTableEncodingScheme, false);
- }
-
- public Delete buildRowDeleteMutation(byte[] rowKey, DeleteType deleteType, long ts) {
- byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
- Delete delete = new Delete(rowKey);
-
- for (ColumnReference ref : newTableColumns) {
- if (deleteType == DeleteType.SINGLE_VERSION) {
- delete.addFamilyVersion(ref.getFamily(), ts);
- } else {
- delete.addFamily(ref.getFamily(), ts);
- }
- }
- if (deleteType == DeleteType.SINGLE_VERSION) {
- delete.addFamilyVersion(emptyCF, ts);
- } else {
- delete.addFamily(emptyCF, ts);
- }
- return delete;
+ newTableEmptyKeyValueRef, newTableWALDisabled, oldTableImmutableStorageScheme, newTableImmutableStorageScheme,
+ newTableEncodingScheme, oldTableEncodingScheme, verified);
}
public ImmutableBytesPtr getEmptyKeyValueFamily() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 961aec6..0bb6f9a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -1110,12 +1110,22 @@ public class SchemaUtil {
return TableName.valueOf(schemaName, tableName);
}
+ public static PName getPhysicalHBaseTableName(byte[] fullTableName, boolean isNamespaceMappingEnabled) {
+ String tableName = getTableNameFromFullName(fullTableName);
+ String schemaName = getSchemaNameFromFullName(fullTableName);
+ return getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMappingEnabled);
+ }
+
public static PName getPhysicalHBaseTableName(String schemaName, String tableName, boolean isNamespaceMapped) {
if (!isNamespaceMapped) { return PNameFactory.newName(getTableNameAsBytes(schemaName, tableName)); }
if (schemaName == null || schemaName.isEmpty()) { return PNameFactory.newName(tableName); }
return PNameFactory.newName(schemaName + QueryConstants.NAMESPACE_SEPARATOR + tableName);
}
+ public static String replaceNamespaceSeparator(PName name) {
+ return name.getString().replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR);
+ }
+
public static boolean isSchemaCheckRequired(PTableType tableType, ReadOnlyProps props) {
return PTableType.TABLE.equals(tableType) && isNamespaceMappingEnabled(tableType, props);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 9bc25e1..04156f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -67,6 +67,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
import java.sql.Types;
import java.text.Format;
import java.text.SimpleDateFormat;
@@ -2250,23 +2251,19 @@ public class UpgradeUtil {
readOnlyProps, PhoenixRuntime.getCurrentScn(readOnlyProps), fullTableName,
table.getType(),conn.getTenantId());
// clear the cache and get new table
- conn.removeTable(conn.getTenantId(), fullTableName,
- table.getParentName() != null ? table.getParentName().getString() : null,
- table.getTimeStamp());
- byte[] tenantIdBytes = conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY :
- conn.getTenantId().getBytes();
- conn.getQueryServices().clearTableFromCache(
- tenantIdBytes,
- table.getSchemaName().getBytes(), table.getTableName().getBytes(),
- PhoenixRuntime.getCurrentScn(readOnlyProps));
- MetaDataMutationResult result =
- new MetaDataClient(conn).updateCache(conn.getTenantId(), schemaName, tableName,
- true);
+ MetaDataMutationResult result = clearCacheAndGetNewTable(conn,
+ conn.getTenantId(),
+ table.getSchemaName()==null?null:table.getSchemaName().getString(),
+ table.getTableName().getString(),
+ table.getParentName()==null?null:table.getParentName().getString(),
+ table.getTimeStamp());
+
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
throw new TableNotFoundException(schemaName, fullTableName);
}
table = result.getTable();
-
+ byte[] tenantIdBytes = conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY :
+ conn.getTenantId().getBytes();
// check whether table is properly upgraded before upgrading indexes
if (table.isNamespaceMapped()) {
for (PTable index : table.getIndexes()) {
@@ -2319,6 +2316,7 @@ public class UpgradeUtil {
index.getTableName());
conn.commit();
}
+
conn.getQueryServices().clearTableFromCache(
tenantIdBytes,
index.getSchemaName().getBytes(), index.getTableName().getBytes(),
@@ -2368,6 +2366,31 @@ public class UpgradeUtil {
}
}
+ public static MetaDataMutationResult clearCacheAndGetNewTable(PhoenixConnection conn, PName tenantId,
+ String schemaName, String tableName, String parentName, long timestamp)
+ throws SQLException {
+ clearCache(conn, tenantId, schemaName, tableName, parentName, timestamp);
+ MetaDataMutationResult result =
+ new MetaDataClient(conn).updateCache(tenantId, schemaName, tableName,
+ true);
+ return result;
+ }
+
+ public static void clearCache(PhoenixConnection conn, PName tenantId,
+ String schemaName, String tableName, String parentName, long timestamp)
+ throws SQLException {
+ conn.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName),
+ parentName, timestamp);
+ byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY :
+ tenantId.getBytes();
+ byte[] schemaBytes = schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY :
+ schemaName.getBytes();
+ conn.getQueryServices().clearTableFromCache(
+ tenantIdBytes,
+ schemaBytes, tableName.getBytes(),
+ PhoenixRuntime.getCurrentScn(conn.getQueryServices().getProps()));
+ }
+
private static void updateIndexesSequenceIfPresent(PhoenixConnection connection, PTable dataTable)
throws SQLException {
PName tenantId = connection.getTenantId();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index affef44..a14d9b3 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -789,7 +789,7 @@ public abstract class BaseTest {
expectedColumnEncoding, String tableName)
throws Exception {
PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
- PTable table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), tableName));
+ PTable table = PhoenixRuntime.getTableNoCache(phxConn, tableName);
assertEquals(expectedStorageScheme, table.getImmutableStorageScheme());
assertEquals(expectedColumnEncoding, table.getEncodingScheme());
}