You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2022/01/19 21:06:10 UTC

[GitHub] [phoenix] gokceni commented on a change in pull request #1373: PHOENIX-6622 Transform monitor

gokceni commented on a change in pull request #1373:
URL: https://github.com/apache/phoenix/pull/1373#discussion_r788135575



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
##########
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.transform;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.end2end.IndexRebuildTaskIT.waitForTaskState;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRawRowCount;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TransformMonitorIT extends ParallelStatsDisabledIT {
+    private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+    private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    public TransformMonitorIT() throws IOException, InterruptedException {
+        testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, "ONE_CELL_PER_COLUMN");
+        testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
+        testProps.put(QueryServices.PHOENIX_ACLS_ENABLED, "true");
+
+        TaskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility()
+                .getRSForFirstRegionInTable(
+                        PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                .get(0).getCoprocessorHost()
+                .findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+    }
+
+    @Before
+    public void setupTest() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
+            conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+        }
+    }
+
+    private void testTransformTable(boolean createIndex, boolean createView, boolean isImmutable) throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String newTableName = dataTableName + "_1";
+        String newTableFullName = SchemaUtil.getTableName(schemaName, newTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String indexName2 = "IDX_" + generateUniqueName();
+        String viewName = "VW_" + generateUniqueName();
+        String viewName2 = "VW2_" + generateUniqueName();
+        String viewIdxName = "VW_IDX_" + generateUniqueName();
+        String viewIdxName2 = "VW_IDX_" + generateUniqueName();
+        String view2IdxName1 = "VW2_IDX_" + generateUniqueName();
+        String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+        String createIndexStmt = "CREATE INDEX %s ON " + dataTableFullName + " (NAME) INCLUDE (ZIP) ";
+        String createViewStmt = "CREATE VIEW %s ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + dataTableFullName;
+        String createViewIdxSql = "CREATE INDEX  %s ON " + viewName + " (VIEW_COL1) include (VIEW_COL2) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 10;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+            if (createIndex) {
+                conn.createStatement().execute(String.format(createIndexStmt, indexName));
+            }
+
+            if (createView) {
+                conn.createStatement().execute(String.format(createViewStmt, viewName));
+                conn.createStatement().execute(String.format(createViewIdxSql, viewIdxName));
+                conn.createStatement().execute("UPSERT INTO " + viewName + "(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (1, 'uname11', 100, 'viewCol2')");
+            }
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+            assertEquals(newTableName, oldTable.getPhysicalName(true).getString());
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, dataTableFullName);
+
+            ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            long newRowCount = countRows(conn, newTableFullName);
+            assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))), newRowCount);
+
+            if (createIndex) {
+                assertEquals(newRowCount, countRows(conn, indexFullName));
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is complete, we are using the new table
+                TransformToolIT.upsertRows(conn, dataTableFullName, (int)newRowCount+1, additionalRows);
+                assertEquals(newRowCount+additionalRows, countRows(conn, indexFullName));
+                assertEquals(newRowCount, getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))));
+
+                // Create another index on the new table and count
+                Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createIndexStmt, indexName2));
+                assertEquals(newRowCount+additionalRows, countRows(conn, dataTableFullName));
+                assertEquals(newRowCount+additionalRows, countRows(conn, SchemaUtil.getTableName(schemaName, indexName2)));
+            } else if (createView) {
+                assertEquals(numOfRows, countRows(conn, viewName));
+                assertEquals(numOfRows, countRowsForViewIndex(conn, dataTableFullName));
+                assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, viewName);
+                conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is complete, we are using the new table
+                TransformToolIT.upsertRows(conn, viewName, (int)newRowCount+1, additionalRows);
+                assertEquals(newRowCount+additionalRows, getRowCount(conn, viewName));
+                assertEquals(newRowCount+additionalRows, countRowsForViewIndex(conn, dataTableFullName));
+
+                // Drop view index and create another on the new table and count
+                conn.createStatement().execute("DROP INDEX " + viewIdxName + " ON " + viewName);
+                Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createViewIdxSql, viewIdxName2));
+                assertEquals(newRowCount+additionalRows, countRowsForViewIndex(conn, dataTableFullName));
+
+                // Create another view and have a new index on top
+                conn.createStatement().execute(String.format(createViewStmt, viewName2));
+                conn.createStatement().execute(String.format(createViewIdxSql, view2IdxName1));
+                assertEquals((newRowCount+additionalRows)*2, countRowsForViewIndex(conn, dataTableFullName));
+
+                conn.createStatement().execute("UPSERT INTO " + viewName2 + "(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (100, 'uname100', 1000, 'viewCol100')");
+                ResultSet rs = conn.createStatement().executeQuery("SELECT VIEW_COL2, NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000");
+                assertTrue(rs.next());
+                assertEquals("viewCol100", rs.getString(1));
+                assertEquals("uname100", rs.getString(2));
+                assertFalse(rs.next());
+            }
+
+        }
+    }
+
+    public static int countRows(Connection conn, String tableFullName) throws SQLException {
+        ResultSet count = conn.createStatement().executeQuery("select  /*+ NO_INDEX*/ count(*) from " + tableFullName);
+        count.next();
+        int numRows = count.getInt(1);
+        return numRows;
+    }
+
+    protected int countRowsForViewIndex(Connection conn, String baseTable) throws IOException, SQLException {
+        String viewIndexTableName = MetaDataUtil.getViewIndexPhysicalName(baseTable);
+        ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
+
+        Table indexHTable = queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+        return getUtility().countRows(indexHTable);
+    }
+
+    @Test
+    public void testTransformMonitor_mutableTableWithoutIndex() throws Exception {
+        testTransformTable(false, false, false);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithoutIndex() throws Exception {
+        testTransformTable(false, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithIndex() throws Exception {
+        testTransformTable(true, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_pausedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.PAUSED, PTable.TaskStatus.COMPLETED);
+    }
+
+    @Test
+    public void testTransformMonitor_completedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.COMPLETED, PTable.TaskStatus.COMPLETED);
+    }
+
+    @Test
+    public void testTransformMonitor_failedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.FAILED, PTable.TaskStatus.FAILED);
+    }
+
+    private void testTransformMonitor_checkStates(PTable.TransformStatus transformStatus, PTable.TaskStatus taskStatus) throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            SystemTransformRecord.SystemTransformBuilder transformBuilder = new SystemTransformRecord.SystemTransformBuilder();
+            String logicalTableName = generateUniqueName();
+            transformBuilder.setLogicalTableName(logicalTableName);
+            transformBuilder.setTransformStatus(transformStatus.name());
+            transformBuilder.setNewPhysicalTableName(logicalTableName + "_1");
+            Transform.upsertTransform(transformBuilder.build(), conn.unwrap(PhoenixConnection.class));
+
+            TaskRegionObserver.SelfHealingTask task =
+                    new TaskRegionObserver.SelfHealingTask(
+                            TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+
+            Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                    .setConn(conn.unwrap(PhoenixConnection.class))
+                    .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+                    .setTenantId(null)
+                    .setSchemaName(null)
+                    .setTableName(logicalTableName)
+                    .setTaskStatus(PTable.TaskStatus.CREATED.toString())
+                    .setData(null)
+                    .setPriority(null)
+                    .setStartTs(startTs)
+                    .setEndTs(null)
+                    .build());
+            task.run();
+
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, logicalTableName, taskStatus);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_pauseAndResumeTransform() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            TransformToolIT.pauseTableTransform(schemaName, dataTableName, conn, "");
+
+            List<String> args = TransformToolIT.getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, true, false, false);
+
+            // This run resumes transform and TransformMonitor task runs and completes it
+            TransformToolIT.runTransformTool(args.toArray(new String[0]), 0);
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, dataTableName, PTable.TaskStatus.COMPLETED);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_mutableTableWithIndex() throws Exception {
+        testTransformTable(true, false, false);
+    }
+
+    @Test
+    public void testTransformMonitor_tableWithViews() throws Exception {
+        testTransformTable(false, true, false);
+    }
+
+    @Test
+    public void testTransformMonitor_index() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+        String newTableFullName = indexFullName + "_1";
+        String createIndexStmt = "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (ZIP) INCLUDE (NAME) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 10;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, numOfRows, "");
+            conn.createStatement().execute(createIndexStmt);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexFullName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableFullName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(indexName, taskRecordList.get(0).getTableName());
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTableNoCache(conn, indexFullName);
+            assertEquals(indexName+"_1", oldTable.getPhysicalName(true).getString());
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newTableFullName);
+            ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            long newRowCount = countRows(conn, newTableFullName);
+            assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(indexFullName))), newRowCount);
+        }
+    }
+
+    @Test
+    public void testTransformTableWithTenantViews() throws Exception {
+        String tenantId = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String viewTenantName = "TENANTVW_" + generateUniqueName();
+        String createTblStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL,ID INTEGER NOT NULL"
+                + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true";
+        String createViewStr = "CREATE VIEW %s  (VIEW_COL1 VARCHAR) AS SELECT * FROM %s";
+
+        String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME, VIEW_COL1) VALUES('%s' , %d, '%s', '%s')";
+
+        Properties props = PropertiesUtil.deepCopy(testProps);
+        Connection connGlobal = null;
+        Connection connTenant = null;
+        try {
+            connGlobal = DriverManager.getConnection(getUrl(), props);
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            connTenant = DriverManager.getConnection(getUrl(), props);
+            connTenant.setAutoCommit(true);
+            String tableStmtGlobal = String.format(createTblStr, dataTableName);
+            connGlobal.createStatement().execute(tableStmtGlobal);
+            assertMetadata(connGlobal, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+
+            String viewStmtTenant = String.format(createViewStr, viewTenantName, dataTableName);
+            connTenant.createStatement().execute(viewStmtTenant);
+
+            // TODO: Fix this as part of implementing TransformTool so that the tenant view rows could be read from the tool
+//            connTenant.createStatement()
+//                    .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 1, "x", "xx"));
+            try {
+                connTenant.createStatement().execute("ALTER TABLE " + dataTableName
+                        + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+                fail("Tenant connection cannot do alter");
+            } catch (SQLException e) {
+                assertEquals(CANNOT_CREATE_TENANT_SPECIFIC_TABLE.getErrorCode(), e.getErrorCode());
+            }
+            connGlobal.createStatement().execute("ALTER TABLE " + dataTableName
+                    + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord tableRecord = Transform.getTransformRecord(null, dataTableName, null, null, connGlobal.unwrap(PhoenixConnection.class));
+            assertNotNull(tableRecord);
+
+            waitForTransformToGetToState(connGlobal.unwrap(PhoenixConnection.class), tableRecord, PTable.TransformStatus.COMPLETED);
+
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 2, "y", "yy"));
+
+            ResultSet rs = connTenant.createStatement().executeQuery("SELECT /*+ NO_INDEX */ VIEW_COL1 FROM " + viewTenantName);
+            assertTrue(rs.next());
+//            assertEquals("xx", rs.getString(1));
+//            assertTrue(rs.next());
+            assertEquals("yy", rs.getString(1));
+            assertFalse(rs.next());
+        } finally {
+            if (connGlobal != null) {
+                connGlobal.close();
+            }
+            if (connTenant != null) {
+                connTenant.close();
+            }
+        }
+    }
+
+    @Test
+    public void testTransformAlreadyTransformedIndex() throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String createIndexStmt = "CREATE INDEX %s ON " + dataTableName + " (NAME) INCLUDE (ZIP) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableName, numOfRows, "");
+            conn.createStatement().execute(String.format(createIndexStmt, indexName));
+            assertEquals(numOfRows, countRows(conn, indexName));
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(null, indexName, dataTableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            TransformToolIT.upsertRows(conn, dataTableName, 2, 1);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableName +
+                    " ACTIVE SET IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=0");
+            record = Transform.getTransformRecord(null, indexName, dataTableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            TransformToolIT.upsertRows(conn, dataTableName, 3, 1);
+            assertEquals(numOfRows + 2, countRows(conn, indexName));
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, record.getNewPhysicalTableName());
+            ResultSet rs = conn.createStatement().executeQuery("SELECT \":ID\", \"0:ZIP\" FROM " + indexName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertEquals( 95051, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("2", rs.getString(1));
+            assertEquals( 95052, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("3", rs.getString(1));
+            assertEquals( 95053, rs.getInt(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testTransformAlreadyTransformedTable() throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            String stmString1 =
+                    "CREATE TABLE IF NOT EXISTS " + dataTableName
+                            + " (ID INTEGER NOT NULL, CITY_PK VARCHAR NOT NULL, NAME_PK VARCHAR NOT NULL,NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID, CITY_PK, NAME_PK)) ";
+            conn.createStatement().execute(stmString1);
+
+            String upsertQuery = "UPSERT INTO %s VALUES(%d, '%s', '%s', '%s', %d)";
+
+            // insert rows
+            conn.createStatement().execute(String.format(upsertQuery, dataTableName, 1, "city1", "name1", "uname1", 95051));
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(null, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+            conn.createStatement().execute(String.format(upsertQuery, dataTableName, 2, "city2", "name2", "uname2", 95052));
+
+            assertEquals(numOfRows+1, countRows(conn, dataTableName));
+
+            // Make sure that we are not accessing the original table. We are supposed to read from the new table above
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName hTableName = TableName.valueOf(dataTableName);
+            admin.disableTable(hTableName);
+            admin.deleteTable(hTableName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=0");
+            record = Transform.getTransformRecord(null, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, record.getNewPhysicalTableName());
+            conn.createStatement().execute(String.format(upsertQuery, dataTableName, 3, "city3", "name3", "uname3", 95053));
+            assertEquals(numOfRows+2, countRows(conn, dataTableName));
+
+            ResultSet rs = conn.createStatement().executeQuery("SELECT ID, ZIP, NAME, NAME_PK, CITY_PK FROM " + dataTableName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertEquals( 95051, rs.getInt(2));
+            assertEquals( "uname1", rs.getString(3));
+            assertEquals( "name1", rs.getString(4));
+            assertEquals( "city1", rs.getString(5));
+            assertTrue(rs.next());
+            assertEquals("2", rs.getString(1));
+            assertEquals( 95052, rs.getInt(2));
+            assertEquals( "uname2", rs.getString(3));
+            assertEquals( "name2", rs.getString(4));
+            assertEquals( "city2", rs.getString(5));
+            assertTrue(rs.next());
+            assertEquals("3", rs.getString(1));
+            assertEquals( 95053, rs.getInt(2));
+            assertEquals( "uname3", rs.getString(3));
+            assertEquals( "name3", rs.getString(4));
+            assertEquals( "city3", rs.getString(5));
+            assertFalse(rs.next());
+        }
+    }
+
+    public void testDifferentClientAccessTransformedTable(boolean isImmutable) throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        try (Connection conn1 = DriverManager.getConnection(getUrl(), testProps)) {
+            conn1.setAutoCommit(true);
+            int numOfRows = 1;
+            TransformToolIT.createTableAndUpsertRows(conn1, dataTableName, numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+
+            conn1.createStatement().execute("ALTER TABLE " + dataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(null, dataTableName, null, null, conn1.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn1.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+
+            // A connection does transform and another connection doesn't try to upsert into old table
+            String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries";
+            try (Connection conn2 = DriverManager.getConnection(url2, PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+                conn2.setAutoCommit(true);
+                TransformToolIT.upsertRows(conn2, dataTableName, 2, 1);
+
+                ResultSet rs = conn2.createStatement().executeQuery("SELECT ID, NAME, ZIP FROM " + dataTableName);
+                assertTrue(rs.next());
+                assertEquals("1", rs.getString(1));
+                assertEquals("uname1", rs.getString(2));
+                assertEquals( 95051, rs.getInt(3));
+                assertTrue(rs.next());
+                assertEquals("2", rs.getString(1));
+                assertEquals("uname2", rs.getString(2));
+                assertEquals( 95052, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testDifferentClientAccessTransformedTable_mutable() throws Exception {
+        // A connection does transform and another connection doesn't try to upsert into old table
+        testDifferentClientAccessTransformedTable(false);
+    }
+
+    @Test
+    public void testDifferentClientAccessTransformedTable_immutable() throws Exception {
+        // A connection does transform and another connection doesn't try to upsert into old table
+        testDifferentClientAccessTransformedTable(true);
+    }
+
+    @Test
+    public void testTransformTable_cutoverNotAuto() throws Exception {
+        // Transform index and see it is not auto cutover
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            TransformMonitorTask.disableTransformMonitorTask(true);
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, numOfRows, "");
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            // Wait for task to fail
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, dataTableName, PTable.TaskStatus.FAILED);
+        } finally {
+            TransformMonitorTask.disableTransformMonitorTask(false);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_tableWithViews_OnOldAndNew() throws Exception {
+        // Create view before and after transform with different select statements and check
+        String schemaName = "S_" + generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String view1 = "VW1_" + generateUniqueName();
+        String view2 = "VW2_" + generateUniqueName();
+        String createTblStr = "CREATE TABLE %s (ID INTEGER NOT NULL, PK1 VARCHAR NOT NULL"
+                + ", NAME VARCHAR CONSTRAINT PK_1 PRIMARY KEY (ID, PK1)) ";
+        String createViewStr = "CREATE VIEW %s  (VIEW_COL1 VARCHAR) AS SELECT * FROM %s WHERE NAME='%s'";
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)){
+            conn.setAutoCommit(true);
+            conn.createStatement().execute(String.format(createTblStr, fullDataTableName));
+
+            int numOfRows=2;
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullDataTableName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            for (int i=1; i <= numOfRows; i++) {
+                stmt1.setInt(1, i);
+                stmt1.setString(2, "pk" + i);
+                stmt1.setString(3, "name"+ i);
+                stmt1.execute();
+            }
+            conn.createStatement().execute(String.format(createViewStr, view1, fullDataTableName, "name1"));
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, fullDataTableName);
+
+            conn.createStatement().execute("ALTER TABLE " + fullDataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            conn.createStatement().execute(String.format(createViewStr, view2, fullDataTableName, "name2"));
+
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + view2);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertEquals("pk2", rs.getString(2));
+            assertFalse(rs.next());
+            rs = conn.createStatement().executeQuery("SELECT * FROM " + view1);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertEquals("pk1", rs.getString(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    public static void waitForTransformToGetToState(PhoenixConnection conn, SystemTransformRecord record, PTable.TransformStatus status) throws InterruptedException, SQLException {
+        int maxTries = 200, nTries = 0;
+        String lastStatus = "";
+        do {
+            if (status.name().equals(record.getTransformStatus())) {
+                return;
+            }
+            Thread.sleep(500); // sleep 1 sec
+            record = Transform.getTransformRecord(record.getSchemaName(), record.getLogicalTableName(), record.getLogicalParentName(), record.getTenantId(), conn);
+            lastStatus = record.getTransformStatus();
+        } while (++nTries < maxTries);
+        try {
+            SingleCellIndexIT.dumpTable("SYSTEM.TASK");

Review comment:
       This is very useful in debugging though. This path is reached when the task did not get to the expected state. At that point, it is great to have this log to see what happened




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org