You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by go...@apache.org on 2022/02/02 22:23:47 UTC

[phoenix] branch master updated: PHOENIX-6622 Transform monitor

This is an automated email from the ASF dual-hosted git repository.

gokcen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new e67582d  PHOENIX-6622 Transform monitor
e67582d is described below

commit e67582dc2f4ab4e50ee9d26d5c6b3662d29c3e83
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Thu Nov 18 10:51:14 2021 -0800

    PHOENIX-6622 Transform monitor
    
    * Implement TransformMonitor to monitor transform tool result and do orchestration
    
    Signed-off-by: Gokcen Iskender <go...@gmail.com>
---
 .../org/apache/phoenix/end2end/AggregateIT.java    |   5 +-
 .../apache/phoenix/end2end/BaseAggregateIT.java    |   6 +-
 .../end2end/LogicalTableNameExtendedIT.java        |  16 +
 .../phoenix/end2end/ParallelStatsDisabledIT.java   |   8 +-
 .../apache/phoenix/end2end/QueryWithOffsetIT.java  |   6 +-
 .../phoenix/end2end/index/SingleCellIndexIT.java   |  44 ++
 .../end2end/{index => transform}/TransformIT.java  | 181 +++++-
 .../transform/TransformMonitorExtendedIT.java      | 171 ++++++
 .../end2end/transform/TransformMonitorIT.java      | 675 +++++++++++++++++++++
 .../end2end/{ => transform}/TransformToolIT.java   | 240 +++++---
 .../org/apache/phoenix/compile/FromCompiler.java   |   3 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |   2 -
 .../phoenix/coprocessor/TaskRegionObserver.java    |  12 +-
 .../coprocessor/tasks/TransformMonitorTask.java    | 216 +++++++
 .../apache/phoenix/exception/SQLExceptionCode.java |   9 +-
 .../phoenix/expression/RowKeyColumnExpression.java |   4 +
 .../org/apache/phoenix/index/IndexMaintainer.java  |  92 ++-
 .../org/apache/phoenix/jdbc/PhoenixConnection.java |   7 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   2 +-
 .../index/IndexVerificationOutputRepository.java   |  11 +-
 .../index/IndexVerificationResultRepository.java   |  11 +-
 .../index/PhoenixIndexImportDirectReducer.java     |   5 +-
 .../transform/PhoenixTransformReducer.java         |   8 +-
 .../phoenix/mapreduce/transform/TransformTool.java | 194 ++++--
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  24 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |   4 +-
 .../org/apache/phoenix/query/QueryConstants.java   |  10 +-
 .../apache/phoenix/schema/ColumnMetaDataOps.java   |  11 +-
 .../phoenix/schema/IndexNotFoundException.java     |   2 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |  45 +-
 .../org/apache/phoenix/schema/PMetaDataImpl.java   |   5 +-
 .../java/org/apache/phoenix/schema/PTable.java     |  25 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |   1 -
 .../phoenix/schema/TableNotFoundException.java     |  12 +-
 .../org/apache/phoenix/schema/TableProperty.java   |  14 +
 .../java/org/apache/phoenix/schema/task/Task.java  |  15 +-
 .../schema/transform/SystemTransformRecord.java    |  41 +-
 .../apache/phoenix/schema/transform/Transform.java | 388 +++++++++---
 .../schema/transform/TransformMaintainer.java      |  68 +--
 .../java/org/apache/phoenix/util/SchemaUtil.java   |  10 +
 .../java/org/apache/phoenix/util/UpgradeUtil.java  |  49 +-
 .../java/org/apache/phoenix/query/BaseTest.java    |   2 +-
 42 files changed, 2326 insertions(+), 328 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
index fbb16d0..5d1a64f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java
@@ -172,7 +172,8 @@ public class AggregateIT extends BaseAggregateIT {
         assertTrue(rs.next());
         assertEquals(0,rs.getInt(1));
         initAvgGroupTable(conn, tableName, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=20 ");
-        testAvgGroupByOrderPreserving(conn, tableName, 13);
+        // When ParallelStats are disabled it is 4. When enabled it is 13
+        testAvgGroupByOrderPreserving(conn, tableName, 4);
         rs = executeQuery(conn, queryBuilder);
         assertTrue(rs.next());
         assertEquals(13,rs.getInt(1));
@@ -185,7 +186,7 @@ public class AggregateIT extends BaseAggregateIT {
         rs = executeQuery(conn, queryBuilder);
         assertTrue(rs.next());
         assertEquals(13,rs.getInt(1));
-        testAvgGroupByOrderPreserving(conn, tableName, 13);
+        testAvgGroupByOrderPreserving(conn, tableName, 4);
         conn.createStatement().execute("ALTER TABLE " + tableName + " SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=100");
         testAvgGroupByOrderPreserving(conn, tableName, 6);
         conn.createStatement().execute("ALTER TABLE " + tableName + " SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=null");
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
index 5b83e39..01e92c6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.executeQuery;
+import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.executeQueryThrowsException;
+import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.validateQueryPlan;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -569,7 +572,8 @@ public abstract class BaseAggregateIT extends ParallelStatsDisabledIT {
             explainPlanAttributes.getServerAggregate());
         TestUtil.analyzeTable(conn, tableName);
         List<KeyRange> splits = TestUtil.getAllSplits(conn, tableName);
-        assertEquals(nGuidePosts, splits.size());
+        // nGuideposts when stats are enabled, 4 when disabled
+        assertEquals(4, splits.size());
     }
     
     @Test
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
index 73eddfb..496d2bd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
@@ -275,12 +276,16 @@ public class LogicalTableNameExtendedIT extends LogicalTableNameBaseIT {
         String schemaName = "S_" + generateUniqueName();
         String tableName = "TBL_" + generateUniqueName();
         String viewName = "VW1_" + generateUniqueName();
+        String viewName2 = "VW2_" + generateUniqueName();
         String viewIndexName1 = "VWIDX1_" + generateUniqueName();
         String viewIndexName2 = "VWIDX2_" + generateUniqueName();
+        String view2IndexName1 = "VW2IDX1_" + generateUniqueName();
         String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
         String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+        String fullViewName2 = SchemaUtil.getTableName(schemaName, viewName2);
         String fullViewIndex1Name = SchemaUtil.getTableName(schemaName, viewIndexName1);
         String fullViewIndex2Name = SchemaUtil.getTableName(schemaName, viewIndexName2);
+        String fullView2Index1Name = SchemaUtil.getTableName(schemaName, view2IndexName1);
         String fullTableHName = schemaName + ":" + tableName;
         try (Connection conn = getConnection(propsNamespace)) {
             conn.setAutoCommit(true);
@@ -307,6 +312,17 @@ public class LogicalTableNameExtendedIT extends LogicalTableNameBaseIT {
             assertEquals("PK10", rs.getString(2));
             assertEquals("VIEW_COL2_10", rs.getString(3));
             assertEquals(false, rs.next());
+
+            conn.createStatement().execute("CREATE VIEW " + fullViewName2 + "  (VIEW_COL1 VARCHAR, VIEW_COL2 VARCHAR) AS SELECT * FROM "
+            + fullTableName);
+            conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + view2IndexName1 + " ON " + fullViewName2 + " (VIEW_COL1) include (VIEW_COL2)");
+            populateView(conn, fullViewName2, 20, 1);
+            rs = conn.createStatement().executeQuery("SELECT * FROM " + fullView2Index1Name + " WHERE \"0:VIEW_COL1\"='VIEW_COL1_10'");
+            assertEquals(true, rs.next());
+            assertEquals("VIEW_COL1_10", rs.getString(1));
+            assertEquals("PK10", rs.getString(2));
+            assertEquals("VIEW_COL2_10", rs.getString(3));
+            assertEquals(false, rs.next());
         }
     }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
index cb7aad8..2ce522d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
@@ -22,6 +22,7 @@ import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserv
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.QueryBuilder;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -58,6 +59,7 @@ public abstract class ParallelStatsDisabledIT extends BaseTest {
     public static synchronized void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
         props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60*60)); // An hour
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
@@ -66,13 +68,13 @@ public abstract class ParallelStatsDisabledIT extends BaseTest {
         BaseTest.freeResourcesIfBeyondThreshold();
     }
 
-    protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
+    public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException {
         PreparedStatement statement = conn.prepareStatement(queryBuilder.build());
         ResultSet rs = statement.executeQuery();
         return rs;
     }
 
-    protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
+    public static ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
             String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
         ResultSet rs = null;
         try {
@@ -85,7 +87,7 @@ public abstract class ParallelStatsDisabledIT extends BaseTest {
         return rs;
     }
 
-    protected void validateQueryPlan(Connection conn, QueryBuilder queryBuilder, String expectedPhoenixPlan, String expectedSparkPlan) throws SQLException {
+    public static void validateQueryPlan(Connection conn, QueryBuilder queryBuilder, String expectedPhoenixPlan, String expectedSparkPlan) throws SQLException {
         if (StringUtils.isNotBlank(expectedPhoenixPlan)) {
             ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + queryBuilder.build());
             assertEquals(expectedPhoenixPlan, QueryUtil.getExplainPlan(rs));
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
index 14045fa..c7a28f0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
@@ -174,7 +174,11 @@ public class QueryWithOffsetIT extends ParallelStatsDisabledIT {
         assertEquals(offset, explainPlanAttributes.getClientOffset()
             .intValue());
         if (!isSalted) {
-            assertEquals("PARALLEL 5-WAY",
+            // When Parallel stats is actually disabled, it is PARALLEL 4-WAY
+            //  CLIENT PARALLEL 4-WAY FULL SCAN OVER T_N000001
+            //  SERVER SORTED BY [C2.V1] CLIENT MERGE SORT CLIENT OFFSET 10
+            // When enabled, it is 5-WAY
+            assertEquals("PARALLEL 4-WAY",
                 explainPlanAttributes.getIteratorTypeAndScanSize());
         } else {
             assertEquals("PARALLEL 10-WAY",
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
index 9e9dd76..d21d00a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
@@ -417,6 +417,50 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testPartialUpdateSingleCellTable() throws Exception {
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            String tableName = "TBL_" + generateUniqueName();
+            String idxName = "IND_" + generateUniqueName();
+
+            createTableAndIndex(conn, tableName, idxName, this.tableDDLOptions, false,3);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
+            assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName);
+
+            // Partial update 1st row
+            String upsert = "UPSERT INTO " + tableName + " (PK1, INT_PK, V4) VALUES ('PK1',1,'UpdatedV4')";
+            conn.createStatement().execute(upsert);
+            conn.commit();
+            String selectFromData = "SELECT /*+ NO_INDEX */ PK1, INT_PK, V1, V2, V4 FROM " + tableName + " where INT_PK = 1 and V4 LIKE 'UpdatedV4'";
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromData);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(actualExplainPlan.contains(tableName));
+
+            rs = conn.createStatement().executeQuery(selectFromData);
+            assertTrue(rs.next());
+            assertEquals("PK1", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            assertEquals("V11", rs.getString(3));
+            assertEquals(2, rs.getInt(4));
+            assertFalse(rs.next());
+
+            String selectFromIndex = "SELECT PK1, INT_PK, V1, V2, V4 FROM " + tableName + " where V2 >= 2 and V4 = 'UpdatedV4'";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromIndex);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(actualExplainPlan.contains(idxName));
+
+            rs = conn.createStatement().executeQuery(selectFromIndex);
+            assertTrue(rs.next());
+            assertEquals("PK1", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            assertEquals("V11", rs.getString(3));
+            assertEquals(2, rs.getInt(4));
+            assertFalse(rs.next());
+        }
+    }
+
     private Connection getTenantConnection(String tenantId) throws Exception {
         Properties tenantProps = PropertiesUtil.deepCopy(testProps);
         tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformIT.java
similarity index 69%
rename from phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
rename to phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformIT.java
index e7aa5aa..bfdc570 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformIT.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.end2end.index;
+package org.apache.phoenix.end2end.transform;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.transform.SystemTransformRecord;
@@ -37,6 +39,9 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Properties;
 
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TABLE_WITH_LOCAL_INDEX;
+import static org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
 import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -44,6 +49,7 @@ import static org.apache.phoenix.util.TestUtil.getRowCount;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -53,6 +59,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
     public TransformIT() {
         testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, "ONE_CELL_PER_COLUMN");
         testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
+        testProps.put(PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED, Boolean.toString(false));
     }
 
     @Before
@@ -195,13 +202,53 @@ public class TransformIT extends ParallelStatsDisabledIT {
                         + " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
                 fail();
             } catch (SQLException e) {
-                assertEquals(SQLExceptionCode.CANNOT_TRANSFORM_VIEW_INDEX.getErrorCode(), e.getErrorCode());
+                assertEquals(CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX.getErrorCode(), e.getErrorCode());
             }
         }
     }
 
     @Test
-    public void testTransformForLiveMutations_mutatingTable() throws Exception {
+    public void testTransform_tableWithLocalIndex() throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        String indexName = "LCLIDX_" + generateUniqueName();
+        String createIndexStmt = "CREATE LOCAL INDEX %s ON " + dataTableName + " (NAME) INCLUDE (ZIP) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableName, numOfRows, "");
+            conn.createStatement().execute(String.format(createIndexStmt, indexName));
+
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+            try {
+                conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableName +
+                        " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+                fail("Cannot transform local index");
+            } catch (SQLException e) {
+                assertEquals(CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX.getErrorCode(), e.getErrorCode());
+            }
+
+            try {
+                conn.createStatement().execute("ALTER TABLE " + dataTableName +
+                        " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+                fail("Cannot transform table with local index");
+            } catch (SQLException e) {
+                assertEquals(CANNOT_TRANSFORM_TABLE_WITH_LOCAL_INDEX.getErrorCode(), e.getErrorCode());
+            }
+
+        }
+    }
+
+    @Test
+    public void testTransformForLiveMutations_mutatingImmutableTable() throws Exception {
+        testTransformForLiveMutations_mutatingTable(" IMMUTABLE_ROWS=TRUE");
+    }
+
+    @Test
+    public void testTransformForLiveMutations_mutatingMutableTable() throws Exception {
+        testTransformForLiveMutations_mutatingTable("");
+    }
+
+    private void testTransformForLiveMutations_mutatingTable(String tableDDL) throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
             conn.setAutoCommit(true);
             String schema = "S_" + generateUniqueName();
@@ -211,7 +258,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
             String fullIdxName = SchemaUtil.getTableName(schema, idxName);
 
             String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
-                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) " + tableDDL;
             conn.createStatement().execute(createTableSql);
 
             String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
@@ -286,7 +333,16 @@ public class TransformIT extends ParallelStatsDisabledIT {
     }
 
     @Test
-    public void testTransformForLiveMutations_mutatingIndex() throws Exception {
+    public void testTransformForLiveMutations_mutatingMutableIndex() throws Exception {
+        testTransformForLiveMutations_mutatingIndex("");
+    }
+
+    @Test
+    public void testTransformForLiveMutations_mutatingImmutableIndex() throws Exception {
+        testTransformForLiveMutations_mutatingIndex(" IMMUTABLE_ROWS=true");
+    }
+
+    private void testTransformForLiveMutations_mutatingIndex(String tableDDL) throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
             conn.setAutoCommit(true);
             String schema = "S_" + generateUniqueName();
@@ -296,7 +352,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
             String fullIdxName = SchemaUtil.getTableName(schema, idxName);
 
             String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
-                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) " + tableDDL;
             conn.createStatement().execute(createTableSql);
 
             // Note that index will not be built, since we create it with ASYNC
@@ -327,7 +383,16 @@ public class TransformIT extends ParallelStatsDisabledIT {
     }
 
     @Test
-    public void testTransformForLiveMutations_mutatingBaseTableForView() throws Exception {
+    public void testTransformForLiveMutations_mutatingMutableBaseTableForView() throws Exception {
+        testTransformForLiveMutations_mutatingBaseTableForView("");
+    }
+
+    @Test
+    public void testTransformForLiveMutations_mutatingImmutableBaseTableForView() throws Exception {
+        testTransformForLiveMutations_mutatingBaseTableForView(" IMMUTABLE_ROWS=true");
+    }
+
+    private void testTransformForLiveMutations_mutatingBaseTableForView(String tableDDL) throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
             conn.setAutoCommit(true);
             String schema = "S_" + generateUniqueName();
@@ -338,7 +403,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
             String viewIdxName = "VWIDX_" + generateUniqueName();
 
             String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
-                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) " + tableDDL;
             conn.createStatement().execute(createTableSql);
             assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
 
@@ -374,6 +439,106 @@ public class TransformIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testTransformForView() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            String schema = "S_" + generateUniqueName();
+            String tableName = "TBL_" + generateUniqueName();
+            String fullTableName = SchemaUtil.getTableName(schema, tableName);
+            String viewName = "VW_" + generateUniqueName();
+            String fullViewName = SchemaUtil.getTableName(schema, viewName);
+
+            String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+            conn.createStatement().execute(createTableSql);
+
+            String createParentViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 VARCHAR ) AS SELECT * FROM " + fullTableName;
+            conn.createStatement().execute(createParentViewSql);
+
+            try {
+                conn.createStatement().execute("ALTER TABLE " + viewName + " SET "
+                        + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+                fail("View transform should fail");
+            } catch (SQLException e) {
+                assertEquals(e.getErrorCode(), VIEW_WITH_PROPERTIES.getErrorCode());
+            }
+        }
+    }
+
+    @Test
+    public void testAlterNotNeedsToTransformDueToSameProps() throws Exception {
+        String schema = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schema, tableName);
+        String fullIdxName = SchemaUtil.getTableName(schema, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+
+            String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+                    "V1 VARCHAR, V2 INTEGER, V3 INTEGER, V4 VARCHAR, V5 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) " +
+                    "IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2";
+            conn.createStatement().execute(createTableSql);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, fullTableName);
+
+            // Now do an unnecessary transform
+            conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+                    + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNull(record);
+
+            String createIndexSql = "CREATE INDEX " + indexName + " ON " + fullTableName + " (PK1, INT_PK) include (V1) ASYNC";
+            conn.createStatement().execute(createIndexSql);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, fullIdxName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + fullTableName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            record = Transform.getTransformRecord(schema, indexName, tableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNull(record);
+        }
+    }
+
+    @Test
+    public void testDropAfterTransform() throws Exception {
+        String schema = "S_" + generateUniqueName();
+        String tableName = "TBL_" + generateUniqueName();
+        String viewName = "VW_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schema, tableName);
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+
+            String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+            conn.createStatement().execute(createTableSql);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+
+            String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
+            conn.createStatement().execute(String.format(upsertStmt, "a", 1, "val1", 1));
+
+            String createParentViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 VARCHAR ) AS SELECT * FROM " + fullTableName;
+            conn.createStatement().execute(createParentViewSql);
+
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (V2) include (V1) ");
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + fullTableName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            assertSystemTransform(conn.unwrap(PhoenixConnection.class), 1, schema, indexName, null);
+            conn.createStatement().execute("DROP INDEX " + indexName + " ON " + fullTableName);
+
+            conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+                    + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            conn.createStatement().execute("DROP VIEW " + viewName);
+            conn.createStatement().execute("DROP TABLE " + fullTableName);
+        }
+    }
+
+
     private void assertSystemTransform(Connection conn, int rowCount, String schema, String logicalTableName, String tenantId) throws SQLException {
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM "+
                 PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
new file mode 100644
index 0000000..9f1b6d4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.transform;
+
+import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.end2end.transform.TransformMonitorIT.waitForTransformToGetToState;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class TransformMonitorExtendedIT extends BaseTest {
+    private static RegionCoprocessorEnvironment taskRegionEnvironment;
+    protected String dataTableDdl = "";
+    private Properties propsNamespace = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> serverProps = com.google.common.collect.Maps.newHashMapWithExpectedSize(2);
+        serverProps.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+                Integer.toString(60*60)); // An hour
+        serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
+
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED,  Boolean.TRUE.toString());
+
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+                new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+
+    public TransformMonitorExtendedIT() throws IOException, InterruptedException {
+        StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append(" COLUMN_ENCODED_BYTES=0,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN");
+        this.dataTableDdl = optionBuilder.toString();
+        propsNamespace.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
+
+        TableName taskTable = TableName.valueOf(SYSTEM_CATALOG_SCHEMA + QueryConstants.NAMESPACE_SEPARATOR + SYSTEM_TASK_TABLE);
+        taskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility()
+                .getRSForFirstRegionInTable(taskTable)
+                .getRegions(taskTable)
+                .get(0).getCoprocessorHost()
+                .findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+    }
+
+    @Before
+    public void setupTest() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), propsNamespace)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
+            conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+        }
+    }
+
+    @Test
+    public void testTransformIndexWithNamespaceEnabled() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String fullDataTableName = SchemaUtil.getTableName(schemaName , dataTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String fullIndexName = SchemaUtil.getTableName(schemaName , indexName);
+        String createIndexStmt = "CREATE INDEX %s ON " + fullDataTableName + " (NAME) INCLUDE (ZIP) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), propsNamespace)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+            TransformToolIT.createTableAndUpsertRows(conn, fullDataTableName, numOfRows, dataTableDdl);
+            conn.createStatement().execute(String.format(createIndexStmt, indexName));
+            assertEquals(numOfRows, TransformMonitorIT.countRows(conn, fullIndexName));
+
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, fullIndexName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + fullDataTableName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexName, fullDataTableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            TransformToolIT.upsertRows(conn, fullDataTableName, 2, 1);
+
+            ResultSet rs = conn.createStatement().executeQuery("SELECT \":ID\", \"0:ZIP\" FROM " + fullIndexName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertEquals( 95051, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("2", rs.getString(1));
+            assertEquals( 95052, rs.getInt(2));
+            assertFalse(rs.next());
+        }
+    }
+    @Test
+    public void testTransformTableWithNamespaceEnabled() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        try (Connection conn = DriverManager.getConnection(getUrl(), propsNamespace)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+            TransformToolIT.createTableAndUpsertRows(conn, fullDataTableName, numOfRows, dataTableDdl);
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, fullDataTableName);
+
+            conn.createStatement().execute("ALTER TABLE " + fullDataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+            TransformToolIT.upsertRows(conn, fullDataTableName, 2, 1);
+            assertEquals(numOfRows+1, TransformMonitorIT.countRows(conn, fullDataTableName));
+
+            ResultSet rs = conn.createStatement().executeQuery("SELECT ID, ZIP FROM " + fullDataTableName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertEquals( 95051, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("2", rs.getString(1));
+            assertEquals( 95052, rs.getInt(2));
+            assertFalse(rs.next());
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
new file mode 100644
index 0000000..11baaf5
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.transform;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.end2end.IndexRebuildTaskIT.waitForTaskState;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRawRowCount;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TransformMonitorIT extends ParallelStatsDisabledIT {
+    private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+    private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+    public TransformMonitorIT() throws IOException, InterruptedException {
+        testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, "ONE_CELL_PER_COLUMN");
+        testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
+        testProps.put(QueryServices.PHOENIX_ACLS_ENABLED, "true");
+
+        TaskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility()
+                .getRSForFirstRegionInTable(
+                        PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                .getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                .get(0).getCoprocessorHost()
+                .findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+    }
+
+    @Before
+    public void setupTest() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
+            conn.createStatement().execute("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+        }
+    }
+
+    private void testTransformTable(boolean createIndex, boolean createView, boolean isImmutable) throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String newTableName = dataTableName + "_1";
+        String newTableFullName = SchemaUtil.getTableName(schemaName, newTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String indexName2 = "IDX_" + generateUniqueName();
+        String viewName = "VW_" + generateUniqueName();
+        String viewName2 = "VW2_" + generateUniqueName();
+        String viewIdxName = "VW_IDX_" + generateUniqueName();
+        String viewIdxName2 = "VW_IDX_" + generateUniqueName();
+        String view2IdxName1 = "VW2_IDX_" + generateUniqueName();
+        String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+        String createIndexStmt = "CREATE INDEX %s ON " + dataTableFullName + " (NAME) INCLUDE (ZIP) ";
+        String createViewStmt = "CREATE VIEW %s ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + dataTableFullName;
+        String createViewIdxSql = "CREATE INDEX  %s ON " + viewName + " (VIEW_COL1) include (VIEW_COL2) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 10;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+            if (createIndex) {
+                conn.createStatement().execute(String.format(createIndexStmt, indexName));
+            }
+
+            if (createView) {
+                conn.createStatement().execute(String.format(createViewStmt, viewName));
+                conn.createStatement().execute(String.format(createViewIdxSql, viewIdxName));
+                conn.createStatement().execute("UPSERT INTO " + viewName + "(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (1, 'uname11', 100, 'viewCol2')");
+            }
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+            assertEquals(newTableName, oldTable.getPhysicalName(true).getString());
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, dataTableFullName);
+
+            ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            long newRowCount = countRows(conn, newTableFullName);
+            assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))), newRowCount);
+
+            if (createIndex) {
+                assertEquals(newRowCount, countRows(conn, indexFullName));
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is complete, we are using the new table
+                TransformToolIT.upsertRows(conn, dataTableFullName, (int)newRowCount+1, additionalRows);
+                assertEquals(newRowCount+additionalRows, countRows(conn, indexFullName));
+                assertEquals(newRowCount, getRawRowCount(cqs.getTable(Bytes.toBytes(dataTableFullName))));
+
+                // Create another index on the new table and count
+                Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createIndexStmt, indexName2));
+                assertEquals(newRowCount+additionalRows, countRows(conn, dataTableFullName));
+                assertEquals(newRowCount+additionalRows, countRows(conn, SchemaUtil.getTableName(schemaName, indexName2)));
+            } else if (createView) {
+                assertEquals(numOfRows, countRows(conn, viewName));
+                assertEquals(numOfRows, countRowsForViewIndex(conn, dataTableFullName));
+                assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, viewName);
+                conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+
+                int additionalRows = 2;
+                // Upsert new rows to new table. Note that after transform is complete, we are using the new table
+                TransformToolIT.upsertRows(conn, viewName, (int)newRowCount+1, additionalRows);
+                assertEquals(newRowCount+additionalRows, getRowCount(conn, viewName));
+                assertEquals(newRowCount+additionalRows, countRowsForViewIndex(conn, dataTableFullName));
+
+                // Drop view index and create another on the new table and count
+                conn.createStatement().execute("DROP INDEX " + viewIdxName + " ON " + viewName);
+                Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+                TableName hTableName = TableName.valueOf(dataTableFullName);
+                admin.disableTable(hTableName);
+                admin.deleteTable(hTableName);
+                conn.createStatement().execute(String.format(createViewIdxSql, viewIdxName2));
+                assertEquals(newRowCount+additionalRows, countRowsForViewIndex(conn, dataTableFullName));
+
+                // Create another view and have a new index on top
+                conn.createStatement().execute(String.format(createViewStmt, viewName2));
+                conn.createStatement().execute(String.format(createViewIdxSql, view2IdxName1));
+                assertEquals((newRowCount+additionalRows)*2, countRowsForViewIndex(conn, dataTableFullName));
+
+                conn.createStatement().execute("UPSERT INTO " + viewName2 + "(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (100, 'uname100', 1000, 'viewCol100')");
+                ResultSet rs = conn.createStatement().executeQuery("SELECT VIEW_COL2, NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000");
+                assertTrue(rs.next());
+                assertEquals("viewCol100", rs.getString(1));
+                assertEquals("uname100", rs.getString(2));
+                assertFalse(rs.next());
+            }
+
+        }
+    }
+
+    public static int countRows(Connection conn, String tableFullName) throws SQLException {
+        ResultSet count = conn.createStatement().executeQuery("select  /*+ NO_INDEX*/ count(*) from " + tableFullName);
+        count.next();
+        int numRows = count.getInt(1);
+        return numRows;
+    }
+
+    protected int countRowsForViewIndex(Connection conn, String baseTable) throws IOException, SQLException {
+        String viewIndexTableName = MetaDataUtil.getViewIndexPhysicalName(baseTable);
+        ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
+
+        Table indexHTable = queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+        // If there are multiple indexes on this view, this will return rows for others as well. For 1 view index, it is fine.
+        return getUtility().countRows(indexHTable);
+    }
+
+    @Test
+    public void testTransformMonitor_mutableTableWithoutIndex() throws Exception {
+        testTransformTable(false, false, false);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithoutIndex() throws Exception {
+        testTransformTable(false, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_immutableTableWithIndex() throws Exception {
+        testTransformTable(true, false, true);
+    }
+
+    @Test
+    public void testTransformMonitor_pausedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.PAUSED, PTable.TaskStatus.COMPLETED);
+    }
+
+    @Test
+    public void testTransformMonitor_completedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.COMPLETED, PTable.TaskStatus.COMPLETED);
+    }
+
+    @Test
+    public void testTransformMonitor_failedTransform() throws Exception {
+        testTransformMonitor_checkStates(PTable.TransformStatus.FAILED, PTable.TaskStatus.FAILED);
+    }
+
+    private void testTransformMonitor_checkStates(PTable.TransformStatus transformStatus, PTable.TaskStatus taskStatus) throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            SystemTransformRecord.SystemTransformBuilder transformBuilder = new SystemTransformRecord.SystemTransformBuilder();
+            String logicalTableName = generateUniqueName();
+            transformBuilder.setLogicalTableName(logicalTableName);
+            transformBuilder.setTransformStatus(transformStatus.name());
+            transformBuilder.setNewPhysicalTableName(logicalTableName + "_1");
+            Transform.upsertTransform(transformBuilder.build(), conn.unwrap(PhoenixConnection.class));
+
+            TaskRegionObserver.SelfHealingTask task =
+                    new TaskRegionObserver.SelfHealingTask(
+                            TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+
+            Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                    .setConn(conn.unwrap(PhoenixConnection.class))
+                    .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+                    .setTenantId(null)
+                    .setSchemaName(null)
+                    .setTableName(logicalTableName)
+                    .setTaskStatus(PTable.TaskStatus.CREATED.toString())
+                    .setData(null)
+                    .setPriority(null)
+                    .setStartTs(startTs)
+                    .setEndTs(null)
+                    .build());
+            task.run();
+
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, logicalTableName, taskStatus);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_pauseAndResumeTransform() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            TransformToolIT.pauseTableTransform(schemaName, dataTableName, conn, "");
+
+            List<String> args = TransformToolIT.getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, true, false, false);
+
+            // This run resumes transform and TransformMonitor task runs and completes it
+            TransformToolIT.runTransformTool(args.toArray(new String[0]), 0);
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(dataTableName, taskRecordList.get(0).getTableName());
+
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, dataTableName, PTable.TaskStatus.COMPLETED);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_mutableTableWithIndex() throws Exception {
+        testTransformTable(true, false, false);
+    }
+
+    @Test
+    public void testTransformMonitor_tableWithViews() throws Exception {
+        testTransformTable(false, true, false);
+    }
+
+    @Test
+    public void testTransformMonitor_index() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexName = "IDX_" + generateUniqueName();
+        String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
+        String newTableFullName = indexFullName + "_1";
+        String createIndexStmt = "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (ZIP) INCLUDE (NAME) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 10;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, numOfRows, "");
+            conn.createStatement().execute(createIndexStmt);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexFullName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableFullName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(conn, null);
+            assertEquals(1, taskRecordList.size());
+            assertEquals(PTable.TaskType.TRANSFORM_MONITOR, taskRecordList.get(0).getTaskType());
+            assertEquals(schemaName, taskRecordList.get(0).getSchemaName());
+            assertEquals(indexName, taskRecordList.get(0).getTableName());
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTableNoCache(conn, indexFullName);
+            assertEquals(indexName+"_1", oldTable.getPhysicalName(true).getString());
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newTableFullName);
+            ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            long newRowCount = countRows(conn, newTableFullName);
+            assertEquals(getRawRowCount(cqs.getTable(Bytes.toBytes(indexFullName))), newRowCount);
+        }
+    }
+
+    @Test
+    public void testTransformTableWithTenantViews() throws Exception {
+        String tenantId = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String viewTenantName = "TENANTVW_" + generateUniqueName();
+        String createTblStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL,ID INTEGER NOT NULL"
+                + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true";
+        String createViewStr = "CREATE VIEW %s  (VIEW_COL1 VARCHAR) AS SELECT * FROM %s";
+
+        String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME, VIEW_COL1) VALUES('%s' , %d, '%s', '%s')";
+
+        Properties props = PropertiesUtil.deepCopy(testProps);
+        Connection connGlobal = null;
+        Connection connTenant = null;
+        try {
+            connGlobal = DriverManager.getConnection(getUrl(), props);
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            connTenant = DriverManager.getConnection(getUrl(), props);
+            connTenant.setAutoCommit(true);
+            String tableStmtGlobal = String.format(createTblStr, dataTableName);
+            connGlobal.createStatement().execute(tableStmtGlobal);
+            assertMetadata(connGlobal, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+
+            String viewStmtTenant = String.format(createViewStr, viewTenantName, dataTableName);
+            connTenant.createStatement().execute(viewStmtTenant);
+
+            // TODO: Fix this as part of implementing TransformTool so that the tenant view rows could be read from the tool
+//            connTenant.createStatement()
+//                    .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 1, "x", "xx"));
+            try {
+                connTenant.createStatement().execute("ALTER TABLE " + dataTableName
+                        + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+                fail("Tenant connection cannot do alter");
+            } catch (SQLException e) {
+                assertEquals(CANNOT_CREATE_TENANT_SPECIFIC_TABLE.getErrorCode(), e.getErrorCode());
+            }
+            connGlobal.createStatement().execute("ALTER TABLE " + dataTableName
+                    + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord tableRecord = Transform.getTransformRecord(null, dataTableName, null, null, connGlobal.unwrap(PhoenixConnection.class));
+            assertNotNull(tableRecord);
+
+            waitForTransformToGetToState(connGlobal.unwrap(PhoenixConnection.class), tableRecord, PTable.TransformStatus.COMPLETED);
+
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 2, "y", "yy"));
+
+            ResultSet rs = connTenant.createStatement().executeQuery("SELECT /*+ NO_INDEX */ VIEW_COL1 FROM " + viewTenantName);
+            assertTrue(rs.next());
+//            assertEquals("xx", rs.getString(1));
+//            assertTrue(rs.next());
+            assertEquals("yy", rs.getString(1));
+            assertFalse(rs.next());
+        } finally {
+            if (connGlobal != null) {
+                connGlobal.close();
+            }
+            if (connTenant != null) {
+                connTenant.close();
+            }
+        }
+    }
+
+    @Test
+    public void testTransformAlreadyTransformedIndex() throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String createIndexStmt = "CREATE INDEX %s ON " + dataTableName + " (NAME) INCLUDE (ZIP) ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableName, numOfRows, "");
+            conn.createStatement().execute(String.format(createIndexStmt, indexName));
+            assertEquals(numOfRows, countRows(conn, indexName));
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexName);
+
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(null, indexName, dataTableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            TransformToolIT.upsertRows(conn, dataTableName, 2, 1);
+
+            // Removing this so that we are sure that we are not picking up the old transform record.
+            Transform.removeTransformRecord(record, conn.unwrap(PhoenixConnection.class));
+            conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + dataTableName +
+                    " ACTIVE SET IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=0");
+            record = Transform.getTransformRecord(null, indexName, dataTableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            TransformToolIT.upsertRows(conn, dataTableName, 3, 1);
+            assertEquals(numOfRows + 2, countRows(conn, indexName));
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, record.getNewPhysicalTableName());
+            ResultSet rs = conn.createStatement().executeQuery("SELECT \":ID\", \"0:ZIP\" FROM " + indexName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertEquals( 95051, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("2", rs.getString(1));
+            assertEquals( 95052, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("3", rs.getString(1));
+            assertEquals( 95053, rs.getInt(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testTransformAlreadyTransformedTable() throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            String stmString1 =
+                    "CREATE TABLE IF NOT EXISTS " + dataTableName
+                            + " (ID INTEGER NOT NULL, CITY_PK VARCHAR NOT NULL, NAME_PK VARCHAR NOT NULL,NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID, CITY_PK, NAME_PK)) ";
+            conn.createStatement().execute(stmString1);
+
+            String upsertQuery = "UPSERT INTO %s VALUES(%d, '%s', '%s', '%s', %d)";
+
+            // insert rows
+            conn.createStatement().execute(String.format(upsertQuery, dataTableName, 1, "city1", "name1", "uname1", 95051));
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(null, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+            conn.createStatement().execute(String.format(upsertQuery, dataTableName, 2, "city2", "name2", "uname2", 95052));
+
+            assertEquals(numOfRows+1, countRows(conn, dataTableName));
+
+            // Make sure that we are not accessing the original table. We are supposed to read from the new table above
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName hTableName = TableName.valueOf(dataTableName);
+            admin.disableTable(hTableName);
+            admin.deleteTable(hTableName);
+
+            // Removing this so that we are sure that we are not picking up the old transform record.
+            Transform.removeTransformRecord(record, conn.unwrap(PhoenixConnection.class));
+            conn.createStatement().execute("ALTER TABLE " + dataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=0");
+            record = Transform.getTransformRecord(null, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, record.getNewPhysicalTableName());
+            conn.createStatement().execute(String.format(upsertQuery, dataTableName, 3, "city3", "name3", "uname3", 95053));
+            assertEquals(numOfRows+2, countRows(conn, dataTableName));
+
+            ResultSet rs = conn.createStatement().executeQuery("SELECT ID, ZIP, NAME, NAME_PK, CITY_PK FROM " + dataTableName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertEquals( 95051, rs.getInt(2));
+            assertEquals( "uname1", rs.getString(3));
+            assertEquals( "name1", rs.getString(4));
+            assertEquals( "city1", rs.getString(5));
+            assertTrue(rs.next());
+            assertEquals("2", rs.getString(1));
+            assertEquals( 95052, rs.getInt(2));
+            assertEquals( "uname2", rs.getString(3));
+            assertEquals( "name2", rs.getString(4));
+            assertEquals( "city2", rs.getString(5));
+            assertTrue(rs.next());
+            assertEquals("3", rs.getString(1));
+            assertEquals( 95053, rs.getInt(2));
+            assertEquals( "uname3", rs.getString(3));
+            assertEquals( "name3", rs.getString(4));
+            assertEquals( "city3", rs.getString(5));
+            assertFalse(rs.next());
+        }
+    }
+
+    public void testDifferentClientAccessTransformedTable(boolean isImmutable) throws Exception {
+        String dataTableName = "TBL_" + generateUniqueName();
+        try (Connection conn1 = DriverManager.getConnection(getUrl(), testProps)) {
+            conn1.setAutoCommit(true);
+            int numOfRows = 1;
+            TransformToolIT.createTableAndUpsertRows(conn1, dataTableName, numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+
+            conn1.createStatement().execute("ALTER TABLE " + dataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(null, dataTableName, null, null, conn1.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn1.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+
+            // A connection does transform and another connection doesn't try to upsert into old table
+            String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries";
+            try (Connection conn2 = DriverManager.getConnection(url2, PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+                conn2.setAutoCommit(true);
+                TransformToolIT.upsertRows(conn2, dataTableName, 2, 1);
+
+                ResultSet rs = conn2.createStatement().executeQuery("SELECT ID, NAME, ZIP FROM " + dataTableName);
+                assertTrue(rs.next());
+                assertEquals("1", rs.getString(1));
+                assertEquals("uname1", rs.getString(2));
+                assertEquals( 95051, rs.getInt(3));
+                assertTrue(rs.next());
+                assertEquals("2", rs.getString(1));
+                assertEquals("uname2", rs.getString(2));
+                assertEquals( 95052, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testDifferentClientAccessTransformedTable_mutable() throws Exception {
+        // A connection does transform and another connection doesn't try to upsert into old table
+        testDifferentClientAccessTransformedTable(false);
+    }
+
+    @Test
+    public void testDifferentClientAccessTransformedTable_immutable() throws Exception {
+        // A connection does transform and another connection doesn't try to upsert into old table
+        testDifferentClientAccessTransformedTable(true);
+    }
+
+    @Test
+    public void testTransformTable_cutoverNotAuto() throws Exception {
+        // Transform index and see it is not auto cutover
+        String schemaName = generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            TransformMonitorTask.disableTransformMonitorTask(true);
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName, numOfRows, "");
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            // Wait for task to fail
+            waitForTaskState(conn, PTable.TaskType.TRANSFORM_MONITOR, dataTableName, PTable.TaskStatus.FAILED);
+        } finally {
+            TransformMonitorTask.disableTransformMonitorTask(false);
+        }
+    }
+
+    @Test
+    public void testTransformMonitor_tableWithViews_OnOldAndNew() throws Exception {
+        // Create view before and after transform with different select statements and check
+        String schemaName = "S_" + generateUniqueName();
+        String dataTableName = "TBL_" + generateUniqueName();
+        String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String view1 = "VW1_" + generateUniqueName();
+        String view2 = "VW2_" + generateUniqueName();
+        String createTblStr = "CREATE TABLE %s (ID INTEGER NOT NULL, PK1 VARCHAR NOT NULL"
+                + ", NAME VARCHAR CONSTRAINT PK_1 PRIMARY KEY (ID, PK1)) ";
+        String createViewStr = "CREATE VIEW %s  (VIEW_COL1 VARCHAR) AS SELECT * FROM %s WHERE NAME='%s'";
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)){
+            conn.setAutoCommit(true);
+            conn.createStatement().execute(String.format(createTblStr, fullDataTableName));
+
+            int numOfRows=2;
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullDataTableName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            for (int i=1; i <= numOfRows; i++) {
+                stmt1.setInt(1, i);
+                stmt1.setString(2, "pk" + i);
+                stmt1.setString(3, "name"+ i);
+                stmt1.execute();
+            }
+            conn.createStatement().execute(String.format(createViewStr, view1, fullDataTableName, "name1"));
+
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, fullDataTableName);
+
+            conn.createStatement().execute("ALTER TABLE " + fullDataTableName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            conn.createStatement().execute(String.format(createViewStr, view2, fullDataTableName, "name2"));
+
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + view2);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertEquals("pk2", rs.getString(2));
+            assertFalse(rs.next());
+            rs = conn.createStatement().executeQuery("SELECT * FROM " + view1);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertEquals("pk1", rs.getString(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    public static void waitForTransformToGetToState(PhoenixConnection conn, SystemTransformRecord record, PTable.TransformStatus status) throws InterruptedException, SQLException {
+        int maxTries = 200, nTries = 0;
+        String lastStatus = "";
+        do {
+            if (status.name().equals(record.getTransformStatus())) {
+                return;
+            }
+            Thread.sleep(500);
+            record = Transform.getTransformRecord(record.getSchemaName(), record.getLogicalTableName(), record.getLogicalParentName(), record.getTenantId(), conn);
+            lastStatus = record.getTransformStatus();
+        } while (++nTries < maxTries);
+        try {
+            SingleCellIndexIT.dumpTable("SYSTEM.TASK");
+        } catch (Exception e) {
+
+        }
+        fail("Ran out of time waiting for transform state to become " + status + " but it was " + lastStatus);
+    }
+
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
similarity index 82%
rename from phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
rename to phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
index 9d87569..b8b39c7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
@@ -15,20 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.end2end;
+package org.apache.phoenix.end2end.transform;
 
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.end2end.index.SingleCellIndexIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
@@ -38,8 +43,11 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +58,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -76,13 +85,23 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class TransformToolIT extends ParallelStatsDisabledIT{
+@RunWith(Parameterized.class)
+public class TransformToolIT extends ParallelStatsDisabledIT {
     private static final Logger LOGGER = LoggerFactory.getLogger(TransformToolIT.class);
     private final String tableDDLOptions;
-    // TODO test with immutable
-    private boolean mutable = true;
 
-    public TransformToolIT() {
+    @Parameterized.Parameters(
+            name = "mutable={0}")
+    public static synchronized Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+        boolean[] Booleans = new boolean[]{true, false};
+        for (boolean mutable : Booleans) {
+            list.add(new Object[]{mutable});
+        }
+        return list;
+    }
+
+    public TransformToolIT(boolean mutable) {
         StringBuilder optionBuilder = new StringBuilder();
         optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE ");
         if (!mutable) {
@@ -93,6 +112,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
 
     @BeforeClass
     public static synchronized void setup() throws Exception {
+        TransformMonitorTask.disableTransformMonitorTask(true);
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
         serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
         serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
@@ -100,29 +120,40 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
                 QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
         serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
         serverProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+        serverProps.put(PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED, Boolean.FALSE.toString());
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED, Boolean.FALSE.toString());
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
                 new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
+    @AfterClass
+    public static synchronized void cleanup() {
+        TransformMonitorTask.disableTransformMonitorTask(false);
+    }
+
     private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows) throws SQLException {
         createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
     }
 
-    private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows, String tableOptions) throws SQLException {
+    public static void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows, String tableOptions) throws SQLException {
         String stmString1 =
                 "CREATE TABLE IF NOT EXISTS " + dataTableFullName
                         + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
                         + tableOptions;
         conn.createStatement().execute(stmString1);
+        upsertRows(conn, dataTableFullName, 1, numOfRows);
+        conn.commit();
+    }
+
+    public static void upsertRows(Connection conn, String dataTableFullName, int startIdx, int numOfRows) throws SQLException {
         String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
         PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
 
         // insert rows
-        for (int i = 1; i <= numOfRows; i++) {
+        for (int i = startIdx; i < startIdx+numOfRows; i++) {
             IndexToolIT.upsertRow(stmt1, i);
         }
-        conn.commit();
     }
     @Test
     public void testTransformTable() throws Exception {
@@ -134,7 +165,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(true);
             int numOfRows = 2;
-            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
 
             conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
                     " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
@@ -146,13 +177,9 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
                     null, null, null, false, false, false, false,false);
             runTransformTool(args.toArray(new String[0]), 0);
             record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
-            assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+            assertTransformStatusOrPartial(PTable.TransformStatus.PENDING_CUTOVER, record);
             assertEquals(getRowCount(conn, dataTableFullName), getRowCount(conn,newTableFullName));
 
-            // Test that the PhysicalTableName is updated.
-            PTable oldTable = PhoenixRuntime.getTable(conn, dataTableFullName);
-            assertEquals(dataTableName+"_1", oldTable.getPhysicalName(true).getString());
-
             String sql = "SELECT ID, NAME, ZIP FROM %s ";
             ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, dataTableFullName));
             ResultSet rs2 = conn.createStatement().executeQuery(String.format(sql, newTableFullName));
@@ -176,7 +203,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(true);
-            createTableAndUpsertRows(conn, dataTableFullName, 2);
+            createTableAndUpsertRows(conn, dataTableFullName, 2, tableDDLOptions);
 
             conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
                     " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
@@ -193,10 +220,10 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         }
     }
 
-    private void pauseTableTransform(String schemaName, String dataTableName, Connection conn) throws Exception {
+    public static void pauseTableTransform(String schemaName, String dataTableName, Connection conn, String tableDDLOptions) throws Exception {
         String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
 
-        createTableAndUpsertRows(conn, dataTableFullName, 2);
+        createTableAndUpsertRows(conn, dataTableFullName, 2, tableDDLOptions);
 
         conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
                 " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
@@ -219,7 +246,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(true);
-            pauseTableTransform(schemaName, dataTableName, conn);
+            pauseTableTransform(schemaName, dataTableName, conn, tableDDLOptions);
         }
     }
 
@@ -230,13 +257,13 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(true);
-            pauseTableTransform(schemaName, dataTableName, conn);
+            pauseTableTransform(schemaName, dataTableName, conn, tableDDLOptions);
             List<String> args = getArgList(schemaName, dataTableName, null,
                     null, null, null, false, false, true, false, false);
 
             runTransformTool(args.toArray(new String[0]), 0);
             SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
-            assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+            assertTransformStatusOrPartial(PTable.TransformStatus.PENDING_CUTOVER, record);
         }
     }
 
@@ -328,11 +355,11 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             conn.setAutoCommit(true);
             String dataDDL =
                     "CREATE TABLE " + dataTableFullName + "(\n"
-                            + "ID VARCHAR(5) NOT NULL PRIMARY KEY,\n"
+                            + "ID CHAR(5) NOT NULL PRIMARY KEY,\n"
                             + "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
                             + "\"test\".CAR_NUM VARCHAR(18) NULL,\n"
                             + "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n"
-                            + "\"test\".ORG_NAME VARCHAR(255) NULL\n" + ") IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES = 0";
+                            + "\"test\".ORG_NAME VARCHAR(255) NULL\n" + ") IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE ";
             conn.createStatement().execute(dataDDL);
 
             // insert data
@@ -390,7 +417,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(true);
             int numOfRows = 2;
-            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
             conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
             SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
             conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName +
@@ -404,13 +431,9 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
 
             runTransformTool(args.toArray(new String[0]), 0);
             record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
-            assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+            assertTransformStatusOrPartial(PTable.TransformStatus.PENDING_CUTOVER, record);
             assertEquals(getRowCount(conn, indexTableFullName), getRowCount(conn, indexTableFullName + "_1"));
 
-            // Test that the PhysicalTableName is updated.
-            PTable oldTable = PhoenixRuntime.getTable(conn, indexTableFullName);
-            assertEquals(indexTableName+"_1", oldTable.getPhysicalName(true).getString());
-
             String sql = "SELECT \":ID\", \"0:NAME\", \"0:ZIP\" FROM %s ORDER BY \":ID\"";
             ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, indexTableFullName));
             ResultSet rs2 = conn.createStatement().executeQuery(String.format(sql, indexTableFullName + "_1"));
@@ -439,7 +462,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(true);
             int numOfRows = 0;
-            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
             String createParentViewSql = "CREATE VIEW " + parentViewName + " ( PARENT_VIEW_COL1 VARCHAR ) AS SELECT * FROM " + dataTableFullName;
             conn.createStatement().execute(createParentViewSql);
 
@@ -508,14 +531,12 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
     }
 
     @Test
-    public void testTransformVerifiedForTransactionalTable() throws Exception {
-        // TODO: Column encoding is not supported for OMID. Have omid test for other type of transforms.
-        // For now, we don't support transforms other than storage and column encoding type change.
-        //testVerifiedForTransactionalTable("OMID");
-        testVerifiedForTransactionalTable("TEPHRA");
+    public void testTransformFailedForTransactionalTable() throws Exception {
+        testTransactionalTableCannotTransform("OMID");
+        testTransactionalTableCannotTransform("TEPHRA");
     }
 
-    private void testVerifiedForTransactionalTable(String provider) throws Exception{
+    private void testTransactionalTableCannotTransform(String provider) throws Exception{
         String tableOptions = tableDDLOptions + " ,TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'";
 
         String schemaName = generateUniqueName();
@@ -529,41 +550,17 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableOptions);
             SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
 
-            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
-                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
-            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
-            assertNotNull(record);
-            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
-
+            try {
+                conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                        " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            } catch (SQLException ex) {
+                assertEquals(SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE.getErrorCode(), ex.getErrorCode());
+            }
+            // Even when we run TransformTool again, verified bit is not cleared but the empty column stays as is
             List<String> args = getArgList(schemaName, dataTableName, null,
                     null, null, null, false, false, false, false, false);
-            runTransformTool(args.toArray(new String[0]), 0);
-            assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
-
-
-            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
-            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-
-            // Run again to check that VERIFIED row still remains verified
-            runTransformTool(args.toArray(new String[0]), 0);
-            assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
-            assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
-
-            // We will have two rows with empty col = 'x' since there is no IndexRegionObserver for transactional table
-            IndexToolIT.upsertRow(stmt1, ++numOfRows);
-            IndexToolIT.upsertRow(stmt1, ++numOfRows);
-
-            assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
-            assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
-            assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), EMPTY_COLUMN_VALUE_BYTES));
 
-            // Even when we run TransformTool again, verified bit is not cleared but the empty column stays as is
-            args = getArgList(schemaName, dataTableName, null,
-                    null, null, null, false, false, false, false, false);
-            runTransformTool(args.toArray(new String[0]), 0);
-            assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
-            assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
-            assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), EMPTY_COLUMN_VALUE_BYTES));
+            runTransformTool(args.toArray(new String[0]), -1);
         }
     }
 
@@ -577,7 +574,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(true);
             int numOfRows = 2;
-            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
             SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
 
             conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
@@ -647,7 +644,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             int numOfRows2= 2;
             String dataTableName2 = generateUniqueName();
             String dataTableFullName2 = SchemaUtil.getTableName(schemaName, dataTableName2);
-            createTableAndUpsertRows(conn, dataTableFullName2, numOfRows2);
+            createTableAndUpsertRows(conn, dataTableFullName2, numOfRows2, tableDDLOptions);
             conn.createStatement().execute("ALTER TABLE " + dataTableFullName2 +
                     " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
             record = Transform.getTransformRecord(schemaName, dataTableName2, null, null, conn.unwrap(PhoenixConnection.class));
@@ -687,7 +684,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             conn.setAutoCommit(true);
             int numOfRows = 1;
             int numOfRowsInNewTbl = 0;
-            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
             SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
 
             conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
@@ -746,6 +743,109 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         }
     }
 
+    @Test
+    public void testTransformVerify_VerifyOnlyShouldNotChangeTransformState() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = "IDX_" + generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
+            conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<String> args = getArgList(schemaName, dataTableName, indexTableName,
+                    null, null, null, false, false, false, false, false);
+            args.add("-v");
+            args.add(IndexTool.IndexVerifyType.ONLY.getValue());
+            TransformTool transformTool = runTransformTool(args.toArray(new String[0]), 0);
+            // No change
+            assertEquals(PTable.TransformStatus.CREATED.toString(), record.getTransformStatus());
+
+            // Now test data table
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false, false);
+            args.add("-v");
+            args.add(IndexTool.IndexVerifyType.ONLY.getValue());
+            runTransformTool(args.toArray(new String[0]), 0);
+            assertEquals(PTable.TransformStatus.CREATED.toString(), record.getTransformStatus());
+            assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+        }
+    }
+
+    @Test
+    public void testTransformVerify_ForceCutover() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = "IDX_" + generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
+            conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            List<String> args = getArgList(schemaName, dataTableName, indexTableName,
+                    null, null, null, false, false, false, false, false);
+            args.add("-fco");
+            runTransformTool(args.toArray(new String[0]), 0);
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertTransformStatusOrPartial(PTable.TransformStatus.COMPLETED, record);
+            PTable pOldIndexTable = PhoenixRuntime.getTableNoCache(conn, indexTableFullName);
+            assertEquals(SchemaUtil.getTableNameFromFullName(record.getNewPhysicalTableName()),
+                    pOldIndexTable.getPhysicalName(true).getString());
+
+            // Now test data table
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false, false);
+            args.add("-fco");
+            runTransformTool(args.toArray(new String[0]), 0);
+
+            record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertTransformStatusOrPartial(PTable.TransformStatus.COMPLETED, record);
+            PTable pOldTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+            assertEquals(SchemaUtil.getTableNameFromFullName(record.getNewPhysicalTableName()),
+                    pOldTable.getPhysicalName(true).getString());
+        }
+    }
+
+    public static void assertTransformStatusOrPartial(PTable.TransformStatus expectedStatus, SystemTransformRecord systemTransformRecord) {
+        if (systemTransformRecord.getTransformStatus().equals(expectedStatus.name())) {
+            return;
+        }
+
+        assertEquals(true, systemTransformRecord.getTransformType().toString().contains("PARTIAL"));
+    }
+
     public static List<String> getArgList(String schemaName, String dataTable, String indxTable, String tenantId,
                                           Long startTime, Long endTime,
                                           boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 35d69aa..c5899e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -787,6 +787,7 @@ public class FromCompiler {
                             tableNode.getName().equals(mutatingTableName)) {
                         alwaysHitServer = true;
                     }
+
                     try {
                         MetaDataMutationResult result = client.updateCache(tenantId, schemaName, tableName, alwaysHitServer);
                         timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
@@ -816,8 +817,8 @@ public class FromCompiler {
                             MetaDataMutationResult result = client.updateCache(schemaName, tableName);
                             if (result.wasUpdated()) {
                                 timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
-                                theTable = result.getTable();
                             }
+                            theTable = result.getTable();
                         }
                         if (theTable == null) {
                             throw new TableNotFoundException(schemaName, tableName, timeStamp);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index a5cd7b4..59961af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -235,8 +235,6 @@ import org.apache.phoenix.schema.metrics.MetricsMetadataSource;
 import org.apache.phoenix.schema.metrics.MetricsMetadataSourceFactory;
 import org.apache.phoenix.schema.task.SystemTaskParams;
 import org.apache.phoenix.schema.task.Task;
-import org.apache.phoenix.schema.transform.SystemTransformRecord;
-import org.apache.phoenix.schema.transform.Transform;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 4687fe1..f5ae114 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -72,6 +72,7 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
     private static Map<TaskType, String> classMap = ImmutableMap.<TaskType, String>builder()
             .put(TaskType.DROP_CHILD_VIEWS, "org.apache.phoenix.coprocessor.tasks.DropChildViewsTask")
             .put(TaskType.INDEX_REBUILD, "org.apache.phoenix.coprocessor.tasks.IndexRebuildTask")
+            .put(TaskType.TRANSFORM_MONITOR, "org.apache.phoenix.coprocessor.tasks.TransformMonitorTask")
             .build();
 
     public enum TaskResultCode {
@@ -186,17 +187,20 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
                         // if current status is already Started, check if we need to re-run.
                         // Task can be async and already Started before.
                         TaskResult result = null;
-                        if (taskRecord.getStatus() != null && taskRecord.getStatus().equals(PTable.TaskStatus.STARTED.toString())) {
+                        if (taskRecord.getStatus() != null
+                                && taskRecord.getStatus().equals(PTable.TaskStatus.STARTED.toString())) {
                             result = (TaskResult) checkCurretResult.invoke(obj, taskRecord);
                         }
 
-                        if (result == null) {
+                        if (result == null || taskRecord.getStatus().equals(PTable.TaskStatus.RETRY.toString())) {
                             // reread task record. There might be async setting of task status
                             taskRecord =
                                     Task.queryTaskTable(connForTask, taskRecord.getTimeStamp(),
                                             taskRecord.getSchemaName(), taskRecord.getTableName(),
                                             taskType, taskRecord.getTenantId(), null).get(0);
-                            if (taskRecord.getStatus() != null && !taskRecord.getStatus().equals(PTable.TaskStatus.CREATED.toString())) {
+                            if (taskRecord.getStatus() != null
+                                    && !taskRecord.getStatus().equals(PTable.TaskStatus.CREATED.toString())
+                                && !taskRecord.getStatus().equals(PTable.TaskStatus.RETRY.toString())) {
                                 continue;
                             }
 
@@ -277,7 +281,7 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
                 .setPriority(taskRecord.getPriority())
                 .setStartTs(taskRecord.getTimeStamp())
                 .setEndTs(endTs)
-                .setAccessCheckEnabled(true)
+                .setAccessCheckEnabled(false)
                 .build());
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java
new file mode 100644
index 0000000..c71380d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_MONITOR_ENABLED;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_RETRY_COUNT;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_RETRY_COUNT_VALUE;
+
+/**
+ * Task runs periodically to monitor and orchestrate ongoing transforms in System.Transform table.
+ *
+ */
+public class TransformMonitorTask extends BaseTask  {
+    public static final String DEFAULT = "IndexName";
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(TransformMonitorTask.class);
+
+    private static boolean isDisabled = false;
+
+    // Called from testong
+    @VisibleForTesting
+    public static void disableTransformMonitorTask(boolean disabled) {
+        isDisabled = disabled;
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+        Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+        Configuration configuration = HBaseConfiguration.addHbaseResources(conf);
+        boolean transformMonitorEnabled = configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, DEFAULT_TRANSFORM_MONITOR_ENABLED);
+        if (!transformMonitorEnabled || isDisabled) {
+            return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "TransformMonitor is disabled");
+        }
+
+        try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)){
+            SystemTransformRecord systemTransformRecord = Transform.getTransformRecord(taskRecord.getSchemaName(),
+                    taskRecord.getTableName(), null, taskRecord.getTenantId(), conn);
+            if (systemTransformRecord == null) {
+                return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+                        "No transform record is found");
+            }
+            String tableName = SchemaUtil.getTableName(systemTransformRecord.getSchemaName(),
+                    systemTransformRecord.getLogicalTableName());
+
+            if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.CREATED.name())) {
+                LOGGER.info("Transform is created, starting the TransformTool ", tableName);
+                // Kick a TransformTool run, it will already update transform record status and job id
+                TransformTool transformTool = TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, false, false);
+                if (transformTool == null) {
+                    // This is not a map/reduce error. There must be some unexpected issue. So, retrying will not solve the underlying issue.
+                    return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "TransformTool run failed. Check the parameters.");
+                }
+            } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name())) {
+                LOGGER.info("Transform is completed, TransformMonitor is done ", tableName);
+                return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+            } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
+                    && !PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType())) {
+                LOGGER.info("Transform is pending cutover ", tableName);
+                Transform.doCutover(conn, systemTransformRecord);
+
+                PTable.TransformType partialTransform = PTable.TransformType.getPartialTransform(systemTransformRecord.getTransformType());
+                if (partialTransform != null) {
+                    // Update transform to be partial
+                    SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(systemTransformRecord);
+                    builder.setTransformType(partialTransform);
+                    // Decrement retry count since TransformTool will increment it. Should we set it to 0?
+                    builder.setTransformRetryCount(systemTransformRecord.getTransformRetryCount()-1);
+                    Transform.upsertTransform(builder.build(), conn);
+
+                    // Fix unverified rows. Running partial transform will make the transform status go back to started
+                    long startFromTs = 0;
+                    if (systemTransformRecord.getTransformLastStateTs() != null) {
+                        startFromTs = systemTransformRecord.getTransformLastStateTs().getTime()-1;
+                    }
+                    TransformTool.runTransformTool(systemTransformRecord, conf, true, startFromTs, null, true, false);
+
+                    // In the future, if we are changing the PK structure, we need to run indextools as well
+                } else {
+                    // No partial transform needed so, we update state of the transform
+                    LOGGER.warn("No partial type of the transform is found. Completing the transform ", tableName);
+                    Transform.updateTransformRecord(conn, systemTransformRecord, PTable.TransformStatus.COMPLETED);
+                }
+            } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name()) ||
+                    (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
+                            && PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType()))) {
+                LOGGER.info(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name()) ?
+                        "Transform is started, we will monitor ": "Partial transform is going on, we will monitor" , tableName);
+                // Monitor the job of transform tool and decide to retry
+                String jobId = systemTransformRecord.getTransformJobId();
+                if (jobId != null) {
+                    Cluster cluster = new Cluster(configuration);
+
+                    Job job = cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobId));
+                    if (job == null) {
+                        LOGGER.warn(String.format("Transform job with Id=%s is not found", jobId));
+                        return new  TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "The job cannot be found");
+                    }
+                    if (job != null && job.isComplete()) {
+                        if (job.isSuccessful()) {
+                            LOGGER.warn("TransformTool job is successful. Transform should have been in a COMPLETED state "
+                                    + taskRecord.getTableName());
+                        } else {
+                            // Retry TransformTool run
+                            int maxRetryCount = configuration.getInt(TRANSFORM_RETRY_COUNT_VALUE, DEFAULT_TRANSFORM_RETRY_COUNT);
+                            if (systemTransformRecord.getTransformRetryCount() < maxRetryCount) {
+                                // Retry count will be incremented in TransformTool
+                                TransformTool.runTransformTool(systemTransformRecord, conf, false, null, null, false, true);
+                            }
+                        }
+                    }
+                }
+            } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.FAILED.name())) {
+                String str = "Transform is marked as failed because either TransformTool is run on the foreground and failed " +
+                        "or it is run as async but there is something wrong with the TransformTool parameters";
+                LOGGER.error(str);
+                return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+            } else if (systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PAUSED.name())) {
+                return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS,
+                        "Transform is paused. No need to monitor");
+            } else {
+                String str = "Transform status is not known " + systemTransformRecord.getString();
+                LOGGER.error(str);
+                return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+            }
+
+            // Update task status to RETRY so that it is retried
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                    .setConn(conn)
+                    .setTaskType(taskRecord.getTaskType())
+                    .setTenantId(taskRecord.getTenantId())
+                    .setSchemaName(taskRecord.getSchemaName())
+                    .setTableName(taskRecord.getTableName())
+                    .setTaskStatus(PTable.TaskStatus.RETRY.toString())
+                    .setData(taskRecord.getData())
+                    .setPriority(taskRecord.getPriority())
+                    .setStartTs(taskRecord.getTimeStamp())
+                    .setEndTs(null)
+                    .setAccessCheckEnabled(true)
+                    .build());
+            return null;
+        }
+        catch (Throwable t) {
+            LOGGER.warn("Exception while running transform monitor task. " +
+                    "It will be retried in the next system task table scan : " +
+                    taskRecord.getSchemaName() + "." + taskRecord.getTableName() +
+                    " with tenant id " + (taskRecord.getTenantId() == null ? " IS NULL" : taskRecord.getTenantId()) +
+                    " and data " + taskRecord.getData(), t);
+            return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, t.toString());
+        }
+    }
+
+    public static void addTransformMonitorTask(PhoenixConnection connection, Configuration configuration, SystemTransformRecord systemTransformRecord,
+                                               PTable.TaskStatus taskStatus, Timestamp startTimestamp, Timestamp endTimestamp) throws IOException {
+        boolean transformMonitorEnabled = configuration.getBoolean(TRANSFORM_MONITOR_ENABLED, DEFAULT_TRANSFORM_MONITOR_ENABLED);
+        if (!transformMonitorEnabled) {
+            LOGGER.warn("TransformMonitor is not enabled. Monitoring/retrying TransformTool and doing cutover will not be done automatically");
+            return;
+        }
+
+        Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                .setConn(connection)
+                .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+                .setTenantId(systemTransformRecord.getTenantId())
+                .setSchemaName(systemTransformRecord.getSchemaName())
+                .setTableName(systemTransformRecord.getLogicalTableName())
+                .setTaskStatus(taskStatus.toString())
+                .setStartTs(startTimestamp)
+                .setEndTs(endTimestamp)
+                .setAccessCheckEnabled(true)
+                .build());
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord taskRecord)
+            throws Exception {
+        // We don't need to check MR job result here since the job itself changes task state.
+        return null;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 72303a3..f65db43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -566,7 +566,14 @@ public enum SQLExceptionCode {
 
     CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE(910, "43M21",
                                         "Cannot transform an index or a table who is already going through a transform."),
-    CANNOT_TRANSFORM_VIEW_INDEX(911, "43M22", "Cannot transform a view index. Consider creating a new view index.");
+
+    CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX(911, "43M22", "Cannot transform a view index or a local index. For view index, consider creating a new view index."),
+
+    CANNOT_TRANSFORM_TABLE_WITH_LOCAL_INDEX(912, "43M23", "Cannot transform a table with a local index."),
+
+    CANNOT_TRANSFORM_TABLE_WITH_APPEND_ONLY_SCHEMA(913, "43M24", "Cannot transform a table with append-only schema."),
+
+    CANNOT_TRANSFORM_TRANSACTIONAL_TABLE(914, "43M25", "Cannot transform a transactional table.");
 
     private final int errorCode;
     private final String sqlState;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
index cfa027f..b6c41b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
@@ -25,9 +25,13 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE;
 
 
 /**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index fbb5182..a465d98 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1058,15 +1058,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, ts);
         return buildUpdateMutation(kvBuilder, valueGetter, dataRowKeyPtr, ts, regionStartKey, regionEndKey,
                 indexRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
-                indexEmptyKeyValueRef, indexWALDisabled, dataImmutableStorageScheme, immutableStorageScheme, encodingScheme, verified);
+                indexEmptyKeyValueRef, indexWALDisabled, dataImmutableStorageScheme, immutableStorageScheme, encodingScheme, dataEncodingScheme, verified);
     }
 
     public static Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts,
                                    byte[] regionStartKey, byte[] regionEndKey, byte[] destRowKey, ImmutableBytesPtr emptyKeyValueCFPtr,
                                           Map<ColumnReference, ColumnReference> coveredColumnsMap,
                                           ColumnReference destEmptyKeyValueRef, boolean destWALDisabled,
-                                          ImmutableStorageScheme srcImmutableStroageScheme, ImmutableStorageScheme destImmutableStorageScheme,
-                                          QualifierEncodingScheme destEncodingScheme, boolean verified) throws IOException {
+                                          ImmutableStorageScheme srcImmutableStorageScheme, ImmutableStorageScheme destImmutableStorageScheme,
+                                          QualifierEncodingScheme destEncodingScheme, QualifierEncodingScheme srcEncodingScheme, boolean verified) throws IOException {
         Set<ColumnReference> coveredColumns = coveredColumnsMap.keySet();
         Put put = null;
         // New row being inserted: add the empty key value
@@ -1115,7 +1115,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                     ColumnReference indexColRef = colRefPair.getFirst();
                     ColumnReference dataColRef = colRefPair.getSecond();
                     byte[] value = null;
-                    if (srcImmutableStroageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                    if (srcImmutableStorageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                         Expression expression = new SingleCellColumnExpression(new PDatum() {
                             @Override public boolean isNullable() {
                                 return false;
@@ -1168,17 +1168,81 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));
             }
         } else {
-            for (ColumnReference ref : coveredColumns) {
-                ColumnReference indexColRef = coveredColumnsMap.get(ref);
-                ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
-                ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
-                ImmutableBytesWritable value = valueGetter.getLatestValue(ref, ts);
-                if (value != null && value != ValueGetter.HIDDEN_BY_DELETE) {
-                    if (put == null) {
-                        put = new Put(destRowKey);
-                        put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+            if (srcImmutableStorageScheme == destImmutableStorageScheme) { //both ONE_CELL
+                for (ColumnReference ref : coveredColumns) {
+                    ColumnReference indexColRef = coveredColumnsMap.get(ref);
+                    ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
+                    ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
+                    ImmutableBytesWritable value = valueGetter.getLatestValue(ref, ts);
+                    if (value != null && value != ValueGetter.HIDDEN_BY_DELETE) {
+                        if (put == null) {
+                            put = new Put(destRowKey);
+                            put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+                        }
+                        put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
+                    }
+                }
+            } else {
+                // Src is SINGLE_CELL, destination is ONE_CELL
+                Map<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> familyToColListMap = Maps.newHashMap();
+                for (ColumnReference ref : coveredColumns) {
+                    ColumnReference indexColRef = coveredColumnsMap.get(ref);
+                    ImmutableBytesPtr cf = new ImmutableBytesPtr(indexColRef.getFamily());
+                    if (!familyToColListMap.containsKey(cf)) {
+                        familyToColListMap.put(cf, Lists.<Pair<ColumnReference, ColumnReference>>newArrayList());
+                    }
+                    familyToColListMap.get(cf).add(Pair.newPair(indexColRef, ref));
+                }
+                // iterate over each column family and create a byte[] containing all the columns
+                for (Entry<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> entry : familyToColListMap.entrySet()) {
+                    byte[] columnFamily = entry.getKey().copyBytesIfNecessary();
+                    List<Pair<ColumnReference, ColumnReference>> colRefPairs = entry.getValue();
+                    int maxEncodedColumnQualifier = Integer.MIN_VALUE;
+                    // find the max col qualifier
+                    for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) {
+                        maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, srcEncodingScheme.decode(colRefPair.getSecond().getQualifier()));
+                    }
+                    // set the values of the columns
+                    for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) {
+                        ColumnReference indexColRef = colRefPair.getFirst();
+                        ColumnReference dataColRef = colRefPair.getSecond();
+                        byte[] valueBytes = null;
+                            Expression expression = new SingleCellColumnExpression(new PDatum() {
+                                @Override public boolean isNullable() {
+                                    return false;
+                                }
+
+                                @Override public SortOrder getSortOrder() {
+                                    return null;
+                                }
+
+                                @Override public Integer getScale() {
+                                    return null;
+                                }
+
+                                @Override public Integer getMaxLength() {
+                                    return null;
+                                }
+
+                                @Override public PDataType getDataType() {
+                                    return null;
+                                }
+                            }, dataColRef.getFamily(), dataColRef.getQualifier(), srcEncodingScheme,
+                                    srcImmutableStorageScheme);
+                            ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+                            expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr);
+                            valueBytes = ptr.copyBytesIfNecessary();
+
+                        if (valueBytes != null) {
+                            ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
+                            ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
+                            if (put == null) {
+                                put = new Put(destRowKey);
+                                put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+                            }
+                            put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, new ImmutableBytesWritable(valueBytes)));
+                        }
                     }
-                    put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
                 }
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d74fe6d..5c8317d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -635,7 +635,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
     }
 
     public PTable getTable(PTableKey key) throws TableNotFoundException {
-        return metaData.getTableRef(key).getTable();
+        PTable table =  metaData.getTableRef(key).getTable();
+        // Force TableNotFoundException for the table that is going through transform
+        if (table.getTransformingNewTable() != null) {
+            throw new TableNotFoundException("Re-read the transforming table", true);
+        }
+        return table;
     }
 
     public PTableRef getTableRef(PTableKey key) throws TableNotFoundException {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 0e4bc77..28e819b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -221,7 +221,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final String TRANSFORM_JOB_ID = "JOB_ID";
     public static final String TRANSFORM_RETRY_COUNT = "RETRY_COUNT";
     public static final String TRANSFORM_START_TS = "START_TS";
-    public static final String TRANSFORM_END_TS = "END_TS";
+    public static final String TRANSFORM_LAST_STATE_TS = "END_TS";
     public static final String OLD_METADATA = "OLD_METADATA";
     public static final String NEW_METADATA = "NEW_METADATA";
     public static final String TRANSFORM_FUNCTION = "TRANSFORM_FUNCTION";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
index a0cd037..e5403c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.mapreduce.index;
 
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -37,6 +39,8 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.util.ByteUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -47,6 +51,7 @@ import java.util.List;
 
 public class IndexVerificationOutputRepository implements AutoCloseable {
     public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexVerificationOutputRepository.class);
 
     private Table indexTable;
     private byte[] indexName;
@@ -184,7 +189,11 @@ public class IndexVerificationOutputRepository implements AutoCloseable {
                 TableDescriptor tableDescriptor = TableDescriptorBuilder
                     .newBuilder(TableName.valueOf(OUTPUT_TABLE_NAME))
                     .setColumnFamily(columnDescriptor).build();
-                admin.createTable(tableDescriptor);
+                try {
+                    admin.createTable(tableDescriptor);
+                } catch (TableExistsException e) {
+                    LOGGER.warn("Table exists, ignoring", e);
+                }
                 outputTable = admin.getConnection().getTable(outputTableName);
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
index 9ff5ed0..7c5c92b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.mapreduce.index;
 
 import org.apache.hadoop.hbase.Cell;
+
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -40,12 +42,15 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.util.ByteUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 
 public class IndexVerificationResultRepository implements AutoCloseable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexVerificationResultRepository.class);
 
     public static final String RUN_STATUS_SKIPPED = "Skipped";
     public static final String RUN_STATUS_EXECUTED = "Executed";
@@ -170,7 +175,11 @@ public class IndexVerificationResultRepository implements AutoCloseable {
                 TableDescriptor tableDescriptor =
                     TableDescriptorBuilder.newBuilder(resultTableName)
                         .setColumnFamily(columnDescriptor).build();
-                admin.createTable(tableDescriptor);
+                try {
+                    admin.createTable(tableDescriptor);
+                } catch (TableExistsException e) {
+                    LOGGER.warn("Table exists, ignoring", e);
+                }
                 resultTable = admin.getConnection().getTable(resultTableName);
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 3f5462c..ed01173 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.mapreduce.transform.TransformTool;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.PIndexState;
@@ -167,6 +166,10 @@ public class PhoenixIndexImportDirectReducer extends
                 try {
                     Transform.completeTransform(ConnectionUtil
                             .getInputConnection(context.getConfiguration()), context.getConfiguration());
+                    if (PhoenixConfigurationUtil.getForceCutover(context.getConfiguration())) {
+                        Transform.doForceCutover(ConnectionUtil
+                                .getInputConnection(context.getConfiguration()), context.getConfiguration());
+                    }
                 } catch (Exception e) {
                     LOGGER.error(" Failed to complete transform", e);
                     throw new RuntimeException(e.getMessage());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
index 6b5b217..bbabe88 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
@@ -19,21 +19,16 @@ package org.apache.phoenix.mapreduce.transform;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.mapreduce.index.IndexToolUtil;
 import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectReducer;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.transform.Transform;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
-import java.sql.SQLException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
@@ -65,6 +60,9 @@ public class PhoenixTransformReducer extends
                          connection = ConnectionUtil.getInputConnection(context.getConfiguration())) {
                 // Complete full Transform and add a partial transform
                 Transform.completeTransform(connection, context.getConfiguration());
+                if (PhoenixConfigurationUtil.getForceCutover(context.getConfiguration())) {
+                    Transform.doForceCutover(connection, context.getConfiguration());
+                }
             } catch (Exception e) {
                 LOGGER.error(" Failed to complete transform", e);
                 throw new RuntimeException(e.getMessage());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
index b8829b6..1159e73 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -65,6 +66,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.task.Task;
 import org.apache.phoenix.schema.transform.SystemTransformRecord;
 import org.apache.phoenix.schema.transform.Transform;
 import org.apache.phoenix.schema.transform.TransformMaintainer;
@@ -92,6 +94,7 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 
 import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
 import static org.apache.phoenix.mapreduce.index.IndexTool.createIndexToolTables;
@@ -121,6 +124,9 @@ public class TransformTool extends Configured implements Tool {
     private static final Option FIX_UNVERIFIED_TRANSFORM_OPTION = new Option("fu", "fix-unverified", false,
             "To fix unverified transform records");
 
+    private static final Option FORCE_CUTOVER_OPTION = new Option("fco", "force-cutover", false,
+            "Updated to old table to point to new table. New table will be active and reads will start serving from the new table");
+
     private static final Option USE_NEW_TABLE_AS_SOURCE_OPTION =
             new Option("fn", "from-new", false,
                     "To verify every row in the new table has a corresponding row in the old table. ");
@@ -170,10 +176,12 @@ public class TransformTool extends Configured implements Tool {
     public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial transform accepts "
             + "non-zero ts set in the past as start-time(st) option and that ts must be present in SYSTEM.TRANSFORM table";
 
-    public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views";
+    public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views or transactional tables";
 
     public static final String PARTIAL_TRANSFORM_NOT_COMPATIBLE = "Can't abort/pause/resume/split during partial transform";
 
+    public static final String FORCE_CUTOVER_NOT_COMPATIBLE = "Force cutover is not applicable with the other parameters";
+
     private static final Option VERIFY_OPTION = new Option("v", "verify", true,
             "To verify every data row in the old table has a corresponding row in the new table. " +
                     "The accepted values are NONE, ONLY, BEFORE,  AFTER, and BOTH. " +
@@ -210,6 +218,7 @@ public class TransformTool extends Configured implements Tool {
     private boolean isPartialTransform;
     private boolean shouldFixUnverified;
     private boolean shouldUseNewTableAsSource;
+    private boolean shouldForceCutover;
     private Job job;
 
     public Long getStartTime() {
@@ -257,6 +266,7 @@ public class TransformTool extends Configured implements Tool {
         options.addOption(START_TIME_OPTION);
         options.addOption(END_TIME_OPTION);
         options.addOption(FIX_UNVERIFIED_TRANSFORM_OPTION);
+        options.addOption(FORCE_CUTOVER_OPTION);
         options.addOption(USE_NEW_TABLE_AS_SOURCE_OPTION);
         options.addOption(AUTO_SPLIT_OPTION);
         options.addOption(ABORT_TRANSFORM_OPTION);
@@ -302,8 +312,13 @@ public class TransformTool extends Configured implements Tool {
         boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
         shouldFixUnverified = cmdLine.hasOption(FIX_UNVERIFIED_TRANSFORM_OPTION.getOpt());
         shouldUseNewTableAsSource = cmdLine.hasOption(USE_NEW_TABLE_AS_SOURCE_OPTION.getOpt());
+        shouldForceCutover = cmdLine.hasOption(FORCE_CUTOVER_OPTION.getOpt());
         basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
         isPartialTransform = cmdLine.hasOption(PARTIAL_TRANSFORM_OPTION.getOpt());
+        if (shouldForceCutover) {
+            LOGGER.info("TransformTool will fix the unverified rows before cutover");
+            shouldFixUnverified = true;
+        }
         if (useStartTime) {
             startTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
         }
@@ -325,6 +340,22 @@ public class TransformTool extends Configured implements Tool {
                         || cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt()))) {
             throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
         }
+        if (shouldForceCutover && (isPartialTransform || useStartTime || useEndTime || shouldUseNewTableAsSource
+                || cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt()))) {
+            throw new IllegalArgumentException(FORCE_CUTOVER_NOT_COMPATIBLE);
+        }
+
+        schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+        dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+        indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+        qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+        isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+        logicalTableName = dataTable;
+        logicalParentName = null;
+        if (!Strings.isNullOrEmpty(indexTable)) {
+            logicalTableName = indexTable;
+            logicalParentName = SchemaUtil.getTableName(schemaName, dataTable);
+        }
 
         if (isPartialTransform) {
             if (!cmdLine.hasOption(START_TIME_OPTION.getOpt())) {
@@ -336,24 +367,12 @@ public class TransformTool extends Configured implements Tool {
                 throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
             }
             if (lastTransformTime == null) {
-                lastTransformTime = transformRecord.getTransformEndTs().getTime();
+                lastTransformTime = transformRecord.getTransformLastStateTs().getTime();
             } else {
                 validateLastTransformTime();
             }
         }
 
-        schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
-        dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
-        indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
-        qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-        isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
-        logicalTableName = dataTable;
-        logicalParentName = null;
-        if (!Strings.isNullOrEmpty(indexTable)) {
-            logicalTableName = indexTable;
-            logicalParentName = SchemaUtil.getTableName(schemaName, dataTable);
-        }
-
         pDataTable = PhoenixRuntime.getTable(
                 connection, SchemaUtil.getQualifiedTableName(schemaName, dataTable));
         if (indexTable != null) {
@@ -392,6 +411,10 @@ public class TransformTool extends Configured implements Tool {
             throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
         }
 
+        if (argPDataTable.isTransactional()) {
+            throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+        }
+
         if (transformRecord == null){
             throw new IllegalStateException("ALTER statement has not been run and the transform has not been created for this table");
         }
@@ -463,24 +486,27 @@ public class TransformTool extends Configured implements Tool {
     }
 
     public Job configureJob() throws Exception {
-        String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable, (shouldFixUnverified ? "Unverified" : "Full"));
-        if (shouldUseNewTableAsSource) {
-            jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable, "NewTableSource_" + pNewTable.getName());
-        }
         if (pNewTable.isTransactional()) {
             configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
-                    Long.toString(TransactionUtil.convertToNanoseconds(pOldTable.getTimeStamp()+1)));
-            configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
-        }
-        if (lastTransformTime != null) {
-            PhoenixConfigurationUtil.setCurrentScnValue(configuration, lastTransformTime);
+                    Long.toString(TransactionUtil.convertToNanoseconds(pOldTable.getTimeStamp() + 1)));
+            configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pNewTable.getTransactionProvider().name());
         } else {
-            if (endTime != null) {
-                PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+            if (lastTransformTime != null) {
+                PhoenixConfigurationUtil.setCurrentScnValue(configuration, lastTransformTime);
             } else {
-                setCurrentScnValue(configuration, EnvironmentEdgeManager.currentTimeMillis());
+                if (endTime != null) {
+                    PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+                } else {
+                    setCurrentScnValue(configuration, EnvironmentEdgeManager.currentTimeMillis());
+                }
             }
         }
+        String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable==null?null:pNewTable.getName(),
+                (shouldFixUnverified?"Unverified":"Full"));
+        if (shouldUseNewTableAsSource) {
+            jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable==null?null:pNewTable.getName(),
+                    "NewTableSource_"+pNewTable.getName());
+        }
 
         final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
         final PostIndexDDLCompiler ddlCompiler =
@@ -553,6 +579,7 @@ public class TransformTool extends Configured implements Tool {
         job.setPriority(this.jobPriority);
         PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
                 oldTableWithSchema, "");
+
         if (outputPath != null) {
             FileOutputFormat.setOutputPath(job, outputPath);
         }
@@ -623,7 +650,7 @@ public class TransformTool extends Configured implements Tool {
                 return 0;
             }
         } catch (Exception e) {
-            LOGGER.error("Caught exception " + e + " trying to run TransformTool.");
+            LOGGER.error("Caught exception " + e + " trying to run TransformTool.", e);
             return 1;
         }
     }
@@ -696,23 +723,20 @@ public class TransformTool extends Configured implements Tool {
     }
 
     public void updateTransformRecord(PhoenixConnection connection, PTable.TransformStatus newStatus) throws Exception {
-        SystemTransformRecord transformRecord = getTransformRecord(connection);
-        updateTransformRecord(connection, transformRecord, newStatus);
-    }
-
-    public static void updateTransformRecord(PhoenixConnection connection, SystemTransformRecord transformRecord, PTable.TransformStatus newStatus) throws Exception {
-        SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
-        builder.setTransformStatus(newStatus.name());
-        if (newStatus == PTable.TransformStatus.COMPLETED || newStatus == PTable.TransformStatus.FAILED) {
-            builder.setEndTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()));
+        if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+            return;
         }
-        Transform.upsertTransform(builder.build(), connection.unwrap(PhoenixConnection.class));
+        SystemTransformRecord transformRecord = getTransformRecord(connection);
+        Transform.updateTransformRecord(connection, transformRecord, newStatus);
     }
 
     protected void updateTransformRecord(Job job) throws Exception {
         if (job == null) {
             return;
         }
+        if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+            return;
+        }
         SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
         SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
         builder.setTransformJobId(job.getJobID().toString());
@@ -775,22 +799,48 @@ public class TransformTool extends Configured implements Tool {
         }
 
         runTransform(args, cmdLine);
+
+        // Check if we already have a TransformMonitor task. If we do, remove those and start a new monitor
+        List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(connection, null);
+        for (Task.TaskRecord taskRecord : taskRecordList) {
+            if (taskRecord.isMatchingTask(transformRecord)) {
+                Task.deleteTask(connection.unwrap(PhoenixConnection.class), PTable.TaskType.TRANSFORM_MONITOR, taskRecord.getTimeStamp(), taskRecord.getTenantId(),
+                        taskRecord.getSchemaName(), taskRecord.getTableName(), configuration.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
+                                QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED));
+            }
+        }
+
+        // start TransformMonitor
+        TransformMonitorTask.addTransformMonitorTask(connection.unwrap(PhoenixConnection.class), configuration, transformRecord,
+                PTable.TaskStatus.CREATED, new Timestamp(EnvironmentEdgeManager.currentTimeMillis()), null);
+
     }
 
     public int runTransform(String[] args, CommandLine cmdLine) throws Exception {
         int status = 0;
+
         updateTransformRecord(connection.unwrap(PhoenixConnection.class), PTable.TransformStatus.STARTED);
         PhoenixConfigurationUtil.setIsPartialTransform(configuration, isPartialTransform);
         PhoenixConfigurationUtil.setIsTransforming(configuration, true);
+        PhoenixConfigurationUtil.setForceCutover(configuration, shouldForceCutover);
 
         if (!Strings.isNullOrEmpty(indexTable)) {
             PhoenixConfigurationUtil.setTransformingTableType(configuration, IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
             // Index table transform. Build the index
             IndexTool indexTool = new IndexTool();
             indexTool.setConf(configuration);
+            if (shouldForceCutover) {
+                List<String> argsList = new ArrayList<String>(Arrays.asList(args));
+                // Remove from cmdLine so that indexTool will not throw error
+                argsList.remove("-"+FORCE_CUTOVER_OPTION.getOpt());
+                argsList.remove("--"+FORCE_CUTOVER_OPTION.getLongOpt());
+                args = argsList.toArray(new String[0]);
+            }
             status = indexTool.run(args);
             Job job = indexTool.getJob();
-            updateTransformRecord(job);
+            if (status == 0) {
+                updateTransformRecord(job);
+            }
         } else {
             PhoenixConfigurationUtil.setTransformingTableType(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
             if (!isPartialTransform) {
@@ -798,7 +848,9 @@ public class TransformTool extends Configured implements Tool {
             }
             configureJob();
             status = runJob();
-            updateTransformRecord(this.job);
+            if (status == 0) {
+                updateTransformRecord(this.job);
+            }
         }
 
         // Record status
@@ -855,4 +907,68 @@ public class TransformTool extends Configured implements Tool {
         int result = ToolRunner.run(new TransformTool(), args);
         System.exit(result);
     }
+
+
+    public static TransformTool runTransformTool(SystemTransformRecord systemTransformRecord, Configuration configuration,
+                                                 boolean isPartial, Long startTime, Long endTime, boolean shouldFixUnverified, boolean doValidation) throws Exception {
+        List<String> args = Lists.newArrayList();
+        if (!Strings.isNullOrEmpty(systemTransformRecord.getSchemaName())) {
+            args.add("--schema=" + systemTransformRecord.getSchemaName());
+        }
+        String oldTableName = systemTransformRecord.getLogicalTableName();
+        boolean isIndex = false;
+        if (!Strings.isNullOrEmpty(systemTransformRecord.getLogicalParentName())) {
+            isIndex = true;
+            args.add("--index-table=" + oldTableName);
+            args.add("--data-table=" + SchemaUtil.getTableNameFromFullName(systemTransformRecord.getLogicalParentName()));
+        } else {
+            args.add("--data-table=" + oldTableName);
+        }
+
+        args.add("-op");
+        args.add("/tmp/" + UUID.randomUUID().toString());
+
+        if (!Strings.isNullOrEmpty(systemTransformRecord.getTenantId())) {
+            args.add("-tenant");
+            args.add(systemTransformRecord.getTenantId());
+        }
+        if(startTime != null) {
+            args.add("-st");
+            args.add(String.valueOf(startTime));
+        }
+        if(endTime != null) {
+            args.add("-et");
+            args.add(String.valueOf(endTime));
+        }
+        if (isPartial) {
+            if (isIndex) {
+                //   args.add("-pr");
+            } else {
+                args.add("-pt");
+            }
+        }
+        if (shouldFixUnverified) {
+            if (!isIndex) {
+                args.add("-fu");
+            }
+        }
+
+        if (doValidation) {
+            args.add("-v");
+            args.add(IndexTool.IndexVerifyType.ONLY.getValue());
+        }
+        String[] cmdArgs = args.toArray(new String[0]);
+        TransformTool tt = new TransformTool();
+        Configuration conf = new Configuration(configuration);
+        tt.setConf(conf);
+
+        LOGGER.info("Running TransformTool with {}", Arrays.toString(cmdArgs), new Exception("Stack Trace"));
+        int status = tt.run(cmdArgs);
+        LOGGER.info("TransformTool with {} status is ", Arrays.toString(cmdArgs), status);
+        if (status != 0) {
+            return null;
+        }
+        return tt;
+    }
+
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 85183cf..f3a3fd8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -97,7 +97,15 @@ public final class PhoenixConfigurationUtil {
     
     /** For local indexes which are stored in a single separate physical table*/
     public static final String PHYSICAL_TABLE_NAME = "phoenix.output.table.name" ;
-    
+
+    public static final String TRANSFORM_RETRY_COUNT_VALUE = "phoenix.transform.retry.count";
+
+    public static final int DEFAULT_TRANSFORM_RETRY_COUNT = 50;
+
+    public static final String TRANSFORM_MONITOR_ENABLED = "phoenix.transform.monitor.enabled";
+
+    public static final boolean DEFAULT_TRANSFORM_MONITOR_ENABLED = true;
+
     public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
     
     public static final String INPUT_CLASS = "phoenix.input.class";
@@ -209,6 +217,9 @@ public final class PhoenixConfigurationUtil {
     // Is the mapreduce used for table/index transform
     public static final String IS_TRANSFORMING_VALUE = "phoenix.mr.istransforming";
 
+    // Is force transform cutover
+    public static final String FORCE_CUTOVER_VALUE = "phoenix.mr.force.cutover";
+
     // Is the mapreduce used for table/index transform
     public static final String TRANSFORMING_TABLE_TYPE = "phoenix.mr.transform.tabletype";
 
@@ -633,6 +644,17 @@ public final class PhoenixConfigurationUtil {
         return Boolean.valueOf(configuration.get(IS_TRANSFORMING_VALUE, "false"));
     }
 
+    public static void setForceCutover(Configuration configuration, Boolean forceCutover) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(forceCutover);
+        configuration.set(FORCE_CUTOVER_VALUE, Boolean.toString(forceCutover));
+    }
+
+    public static Boolean getForceCutover(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return Boolean.valueOf(configuration.get(FORCE_CUTOVER_VALUE, "false"));
+    }
+
     public static void setTransformingTableType(Configuration configuration,
                                                 SourceTable sourceTable) {
         Preconditions.checkNotNull(configuration);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f340deb..ea25cff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4546,8 +4546,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             // No tables exist matching "SYSTEM\..*", they are all already in "SYSTEM:.*"
             if (tableNames.size() == 0) { return; }
             // Try to move any remaining tables matching "SYSTEM\..*" into "SYSTEM:"
-            if (tableNames.size() > 8) {
-                LOGGER.warn("Expected 8 system tables but found " + tableNames.size() + ":" + tableNames);
+            if (tableNames.size() > 9) {
+                LOGGER.warn("Expected 9 system tables but found " + tableNames.size() + ":" + tableNames);
             }
 
             byte[] mappedSystemTable = SchemaUtil
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 8f11d41..cf203da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -157,7 +157,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_END_TS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_LAST_STATE_TS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_FUNCTION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_JOB_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT;
@@ -562,20 +562,20 @@ public interface QueryConstants {
             TENANT_ID + " VARCHAR NULL,\n" +
             TABLE_SCHEM + " VARCHAR NULL," +
             LOGICAL_TABLE_NAME + " VARCHAR NOT NULL,\n" +
-            NEW_PHYS_TABLE_NAME + " VARCHAR NOT NULL,\n" +
-            TRANSFORM_TYPE + " INTEGER NOT NULL," +
             // Non-PK columns
+            NEW_PHYS_TABLE_NAME + " VARCHAR,\n" +
+            TRANSFORM_TYPE + " INTEGER," +
             LOGICAL_PARENT_NAME + " VARCHAR NULL,\n" + // If this is an index, Logical_Parent_Name is the data table name. Index name is not unique.
             TRANSFORM_STATUS + " VARCHAR NULL," +
             TRANSFORM_JOB_ID + " VARCHAR NULL," +
             TRANSFORM_RETRY_COUNT + " INTEGER NULL," +
             TRANSFORM_START_TS + " TIMESTAMP NULL," +
-            TRANSFORM_END_TS + " TIMESTAMP NULL," +
+            TRANSFORM_LAST_STATE_TS + " TIMESTAMP NULL," +
             OLD_METADATA + " VARCHAR NULL,\n" +
             NEW_METADATA + " VARCHAR NULL,\n" +
             TRANSFORM_FUNCTION + " VARCHAR NULL\n" +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
-            TENANT_ID + "," + TABLE_SCHEM + "," + LOGICAL_TABLE_NAME + "," + NEW_PHYS_TABLE_NAME + "," + TRANSFORM_TYPE + "))\n" +
+            TENANT_ID + "," + TABLE_SCHEM + "," + LOGICAL_TABLE_NAME + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +
             ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
             ColumnFamilyDescriptorBuilder.TTL + "=" + TRANSFORM_TABLE_TTL + ",\n" +     // 10 days
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
index 90451af..2a21a29 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
@@ -81,9 +81,18 @@ public class ColumnMetaDataOps {
                     IS_ROW_TIMESTAMP +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
+    public static void addColumnMutation(PhoenixConnection connection, String tenantId, String schemaName, String tableName, PColumn column, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
+        addColumnMutationInternal(connection, tenantId, schemaName, tableName, column, parentTableName, pkName, keySeq, isSalted);
+    }
+
     public static void addColumnMutation(PhoenixConnection connection, String schemaName, String tableName, PColumn column, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
+        addColumnMutationInternal(connection, connection.getTenantId() == null ? null : connection.getTenantId().getString()
+                , schemaName, tableName, column, parentTableName, pkName, keySeq, isSalted);
+    }
+
+    private static void addColumnMutationInternal(PhoenixConnection connection, String tenantId, String schemaName, String tableName, PColumn column, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
         try (PreparedStatement colUpsert = connection.prepareStatement(UPSERT_COLUMN)) {
-            colUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+            colUpsert.setString(1, tenantId);
             colUpsert.setString(2, schemaName);
             colUpsert.setString(3, tableName);
             colUpsert.setString(4, column.getName().getString());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/IndexNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/IndexNotFoundException.java
index f7f0727..953933e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/IndexNotFoundException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/IndexNotFoundException.java
@@ -38,6 +38,6 @@ public class IndexNotFoundException extends TableNotFoundException {
     }
 
     public IndexNotFoundException(String schemaName, String tableName, long timestamp) {
-        super(schemaName, tableName, timestamp, code);
+        super(schemaName, tableName, timestamp, code, false);
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index cd9d052..c696bb4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -25,6 +25,9 @@ import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLi
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
 import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.INDEX_NAME;
 import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.REBUILD_ALL;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX;
+import static org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
+import static org.apache.phoenix.exception.SQLExceptionCode.PARENT_TABLE_NOT_FOUND;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
@@ -2398,11 +2401,15 @@ public class MetaDataClient {
                         linkStatement.setString(4, physicalName.getString());
                         linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue());
                         if (tableType == PTableType.VIEW) {
-                            PTable logicalTable = connection.getTable(new PTableKey(null, physicalName.getString()
-                                    .replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR)));
+                            if (parent.getType() == PTableType.TABLE) {
+                                linkStatement.setString(4, SchemaUtil.getTableName(parent.getSchemaName().getString(),parent.getTableName().getString()));
+                                linkStatement.setLong(6, parent.getSequenceNumber());
+                            } else { //This is a grandchild view, find the physical base table
+                                PTable logicalTable = connection.getTable(new PTableKey(null, SchemaUtil.replaceNamespaceSeparator(physicalName)));
+                                linkStatement.setString(4, SchemaUtil.getTableName(logicalTable.getSchemaName().getString(),logicalTable.getTableName().getString()));
+                                linkStatement.setLong(6, logicalTable.getSequenceNumber());
+                            }
                             // Set link to logical name
-                            linkStatement.setString(4, SchemaUtil.getTableName(logicalTable.getSchemaName().getString(),logicalTable.getTableName().getString()));
-                            linkStatement.setLong(6, logicalTable.getSequenceNumber());
                             linkStatement.setString(7, null);
                         } else {
                             linkStatement.setLong(6, parent.getSequenceNumber());
@@ -3367,7 +3374,7 @@ public class MetaDataClient {
                         .setSchemaName(schemaName).setTableName(tableName).build().buildException();
             }
         } catch (TableNotFoundException e) {
-            if (!ifExists) {
+            if (!ifExists && !e.isThrownToForceReReadForTransformingTable()) {
                 if (tableType == PTableType.INDEX)
                     throw new IndexNotFoundException(e.getSchemaName(),
                             e.getTableName(), e.getTimeStamp());
@@ -3866,6 +3873,21 @@ public class MetaDataClient {
                 changingPhoenixTableProperty = evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName);
 
                 boolean isTransformNeeded = Transform.checkIsTransformNeeded(metaProperties, schemaName, table, tableName, null, tenantIdToUse, connection);
+                if (isTransformNeeded) {
+                    // We can add a support for these later. For now, not supported.
+                    if (MetaDataUtil.hasLocalIndexTable(connection, physicalTableName.getBytes())) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_TABLE_WITH_LOCAL_INDEX)
+                                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                    }
+                    if (table.isAppendOnlySchema()) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_TABLE_WITH_APPEND_ONLY_SCHEMA)
+                                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                    }
+                    if (table.isTransactional()) {
+                        throw new SQLExceptionInfo.Builder(CANNOT_TRANSFORM_TRANSACTIONAL_TABLE)
+                                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                    }
+                }
 
                 // If changing isImmutableRows to true or it's not being changed and is already true
                 boolean willBeImmutableRows = Boolean.TRUE.equals(metaPropertiesEvaluated.getIsImmutableRows()) || (metaPropertiesEvaluated.getIsImmutableRows() == null && table.isImmutableRows());
@@ -4801,7 +4823,7 @@ public class MetaDataClient {
 
                 if (isTransformNeeded) {
                     if (indexRef.getTable().getViewIndexId() != null) {
-                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_VIEW_INDEX)
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_LOCAL_OR_VIEW_INDEX)
                                 .setSchemaName(schemaName).setTableName(indexName).build().buildException();
                     }
                     try {
@@ -5151,6 +5173,17 @@ public class MetaDataClient {
             throws SQLException {
         boolean changingPhoenixTableProperty = false;
 
+        if (metaProperties.getImmutableRowsProp() != null) {
+            if (metaProperties.getImmutableRowsProp().booleanValue() != table.isImmutableRows()) {
+                if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY)
+                            .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                }
+                metaPropertiesEvaluated.setIsImmutableRows(metaProperties.getImmutableRowsProp());
+                changingPhoenixTableProperty = true;
+            }
+        }
+
         if (metaProperties.getImmutableRowsProp() != null && table.getType() != INDEX) {
             if (metaProperties.getImmutableRowsProp().booleanValue() != table.isImmutableRows()) {
                 metaPropertiesEvaluated.setIsImmutableRows(metaProperties.getImmutableRowsProp());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index d43d2aa..be5971a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -20,11 +20,11 @@ package org.apache.phoenix.schema;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 
 import java.sql.SQLException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
@@ -154,7 +154,8 @@ public class PMetaDataImpl implements PMetaData {
         for (PTable index : table.getIndexes()) {
             metaData.put(index.getKey(), tableRefFactory.makePTableRef(index, this.timeKeeper.getCurrentTime(), resolvedTime));
         }
-        if (table.getPhysicalName(true) != null && !table.getPhysicalName(true).getString().equals(table.getTableName().getString())) {
+        if (table.getPhysicalName(true) != null &&
+                !Strings.isNullOrEmpty(table.getPhysicalName(true).getString()) && !table.getPhysicalName(true).getString().equals(table.getTableName().getString())) {
             String physicalTableName =  table.getPhysicalName(true).getString().replace(
                     QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR);
             String physicalTableFullName = SchemaUtil.getTableName(table.getSchemaName() != null ? table.getSchemaName().getString() : null, physicalTableName);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 437ab61..e995e50 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL
 import static org.apache.phoenix.util.EncodedColumnsUtil.isReservedColumnQualifier;
 
 import java.io.DataOutputStream;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -201,7 +202,8 @@ public interface PTable extends PMetaDataEntity {
 
     public enum TaskType {
         DROP_CHILD_VIEWS((byte)1),
-        INDEX_REBUILD((byte)2);
+        INDEX_REBUILD((byte)2),
+        TRANSFORM_MONITOR((byte)3);
 
         private final byte[] byteValue;
         private final byte serializedValue;
@@ -250,6 +252,11 @@ public interface PTable extends PMetaDataEntity {
                 return  "FAILED";
             }
         },
+        RETRY {
+            public String toString() {
+                return  "RETRY";
+            }
+        },
     }
 
     public enum TransformType {
@@ -280,6 +287,17 @@ public interface PTable extends PMetaDataEntity {
             }
             return TransformType.values()[serializedValue-1];
         }
+        public static TransformType getPartialTransform(TransformType transformType) {
+            if (transformType == METADATA_TRANSFORM) {
+                return METADATA_TRANSFORM_PARTIAL;
+            }
+            return null;
+        }
+        public static boolean isPartialTransform(TransformType transformType){
+            List<PTable.TransformType> partials = new ArrayList<>();
+            partials.add(PTable.TransformType.METADATA_TRANSFORM_PARTIAL);
+            return partials.contains(transformType);
+        }
     }
 
     public enum TransformStatus {
@@ -293,6 +311,11 @@ public interface PTable extends PMetaDataEntity {
                 return  "STARTED";
             }
         },
+        PENDING_CUTOVER {
+            public String toString() {
+                return  "PENDING_CUTOVER";
+            }
+        },
         COMPLETED {
             public String toString() {
                 return  "COMPLETED";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 03f0c11..4cfcdf4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -697,7 +697,6 @@ public class PTableImpl implements PTable {
             Preconditions.checkNotNull(this.physicalNames);
             //hasColumnsRequiringUpgrade and rowKeyOrderOptimizable are booleans and can never be
             // null, so no need to check them
-
             PName fullName = PNameFactory.newName(SchemaUtil.getTableName(
                     this.schemaName.getString(), this.tableName.getString()));
             int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableNotFoundException.java
index ebc6b4d..48da43f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableNotFoundException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableNotFoundException.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.util.SchemaUtil;
 public class TableNotFoundException extends MetaDataEntityNotFoundException {
     private static final long serialVersionUID = 1L;
     private static SQLExceptionCode code = SQLExceptionCode.TABLE_UNDEFINED;
+    private boolean thrownToForceReReadForTransformingTable = false;
     private final long timestamp;
 
     public TableNotFoundException(TableNotFoundException e, long timestamp) {
@@ -44,21 +45,28 @@ public class TableNotFoundException extends MetaDataEntityNotFoundException {
         this(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName));
     }
 
+    public TableNotFoundException(String tableName, boolean thrownForForce) {
+        this(SchemaUtil.getSchemaNameFromFullName(tableName), SchemaUtil.getTableNameFromFullName(tableName),
+                HConstants.LATEST_TIMESTAMP, code, thrownForForce);
+    }
+
     public TableNotFoundException(String schemaName, String tableName) {
         this(schemaName, tableName, HConstants.LATEST_TIMESTAMP);
     }
     
     public TableNotFoundException(String schemaName, String tableName, long timestamp) {
-        this(schemaName, tableName, timestamp, code);
+        this(schemaName, tableName, timestamp, code, false);
     }
 
-    public TableNotFoundException(String schemaName, String tableName, long timestamp, SQLExceptionCode code) {
+    public TableNotFoundException(String schemaName, String tableName, long timestamp, SQLExceptionCode code, boolean thrownForForceReRead) {
         super(new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName).build().toString(),
                 code.getSQLState(), code.getErrorCode(), schemaName, tableName, null);
         this.timestamp = timestamp;
+        this.thrownToForceReReadForTransformingTable = thrownForForceReRead;
     }
 
     public long getTimeStamp() {
         return timestamp;
     }
+    public boolean isThrownToForceReReadForTransformingTable() { return thrownToForceReReadForTransformingTable;};
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 0a14dcd..d35ebc0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -203,6 +203,20 @@ public enum TableProperty {
 
     },
 
+    // Same as COLUMN_ENCODED_BYTES. If we don't have this one, isPhoenixProperty returns false.
+    ENCODING_SCHEME(PhoenixDatabaseMetaData.ENCODING_SCHEME, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
+        @Override
+        public Object getValue(Object value) {
+            return COLUMN_ENCODED_BYTES.getValue(value);
+        }
+
+        @Override
+        public Object getPTableValue(PTable table) {
+            return table.getEncodingScheme();
+        }
+
+    },
+
     IMMUTABLE_STORAGE_SCHEME(PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
         @Override
         public ImmutableStorageScheme getValue(Object value) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
index 6a742e2..5aebb4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.schema.task;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -27,7 +30,6 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
     .TaskMetaDataService;
-import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcUtil;
 import org.apache.hadoop.hbase.security.User;
@@ -512,5 +514,16 @@ public class Task {
             this.taskType = taskType;
         }
 
+        public boolean isMatchingTask(SystemTransformRecord transformRecord) {
+            if (getTaskType() != PTable.TaskType.TRANSFORM_MONITOR) {
+                return false;
+            }
+            if (StringUtils.equals(transformRecord.getLogicalTableName(), getTableName())
+                && StringUtils.equals(transformRecord.getTenantId(), getTenantId())
+                && StringUtils.equals(transformRecord.getSchemaName(), getSchemaName())) {
+                return true;
+            }
+            return false;
+        }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
index dd678ab..63378bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
@@ -32,7 +32,7 @@ import java.sql.Timestamp;
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
     value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
-    justification = "endTs and startTs are not used for mutation")
+    justification = "lastStateTs and startTs are not used for mutation")
 public class SystemTransformRecord {
     private final PTable.TransformType transformType;
     private final String schemaName;
@@ -44,7 +44,7 @@ public class SystemTransformRecord {
     private final String transformJobId;
     private final Integer transformRetryCount;
     private final Timestamp startTs;
-    private final Timestamp endTs;
+    private final Timestamp lastStateTs;
     private final String oldMetadata;
     private final String newMetadata;
     private final String transformFunction;
@@ -52,7 +52,7 @@ public class SystemTransformRecord {
     public SystemTransformRecord(PTable.TransformType transformType,
                                  String schemaName, String logicalTableName, String tenantId, String newPhysicalTableName, String logicalParentName,
                                  String transformStatus, String transformJobId, Integer transformRetryCount, Timestamp startTs,
-                                 Timestamp endTs, String oldMetadata, String newMetadata, String transformFunction) {
+                                 Timestamp lastStateTs, String oldMetadata, String newMetadata, String transformFunction) {
         this.transformType = transformType;
         this.schemaName = schemaName;
         this.tenantId = tenantId;
@@ -63,16 +63,16 @@ public class SystemTransformRecord {
         this.transformJobId = transformJobId;
         this.transformRetryCount = transformRetryCount;
         this.startTs = startTs;
-        this.endTs = endTs;
+        this.lastStateTs = lastStateTs;
         this.oldMetadata = oldMetadata;
         this.newMetadata = newMetadata;
         this.transformFunction = transformFunction;
     }
 
     public String getString() {
-        return String.format("transformType: %s, schameName: %s, logicalTableName: %s, newPhysicalTableName: %s, logicalParentName: %s "
+        return String.format("transformType: %s, schameName: %s, logicalTableName: %s, newPhysicalTableName: %s, logicalParentName: %s, status: %s"
                 , String.valueOf(transformType), String.valueOf(schemaName), String.valueOf(logicalTableName), String.valueOf(newPhysicalTableName),
-                String.valueOf(logicalParentName));
+                String.valueOf(logicalParentName), String.valueOf(transformStatus));
     }
 
     public PTable.TransformType getTransformType() {
@@ -115,8 +115,8 @@ public class SystemTransformRecord {
         return startTs;
     }
 
-    public Timestamp getTransformEndTs() {
-        return endTs;
+    public Timestamp getTransformLastStateTs() {
+        return lastStateTs;
     }
 
     public String getOldMetadata() {
@@ -129,15 +129,16 @@ public class SystemTransformRecord {
 
     public boolean isActive() {
         return (transformStatus.equals(PTable.TransformStatus.STARTED.name())
-                || transformStatus.equals(PTable.TransformStatus.CREATED.name()));
+                || transformStatus.equals(PTable.TransformStatus.CREATED.name())
+                || transformStatus.equals(PTable.TransformStatus.PENDING_CUTOVER.name()));
     }
 
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
         value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
-        justification = "endTs and startTs are not used for mutation")
+        justification = "lastStateTs and startTs are not used for mutation")
     public static class SystemTransformBuilder {
 
-        private PTable.TransformType transformType;
+        private PTable.TransformType transformType = PTable.TransformType.METADATA_TRANSFORM;
         private String schemaName;
         private String tenantId;
         private String logicalTableName;
@@ -147,7 +148,7 @@ public class SystemTransformRecord {
         private String transformJobId;
         private int transformRetryCount =0;
         private Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
-        private Timestamp endTs;
+        private Timestamp lastStateTs;
         private String oldMetadata;
         private String newMetadata;
         private String transformFunction;
@@ -167,7 +168,7 @@ public class SystemTransformRecord {
             this.setTransformJobId(systemTransformRecord.getTransformJobId());
             this.setTransformRetryCount(systemTransformRecord.getTransformRetryCount());
             this.setStartTs(systemTransformRecord.getTransformStartTs());
-            this.setEndTs(systemTransformRecord.getTransformEndTs());
+            this.setLastStateTs(systemTransformRecord.getTransformLastStateTs());
             this.setOldMetadata(systemTransformRecord.getOldMetadata());
             this.setNewMetadata(systemTransformRecord.getNewMetadata());
             this.setTransformFunction(systemTransformRecord.getTransformFunction());
@@ -233,8 +234,8 @@ public class SystemTransformRecord {
             return this;
         }
 
-        public SystemTransformBuilder setEndTs(Timestamp endTs) {
-            this.endTs = endTs;
+        public SystemTransformBuilder setLastStateTs(Timestamp ts) {
+            this.lastStateTs = ts;
             return this;
         }
 
@@ -244,12 +245,12 @@ public class SystemTransformRecord {
         }
 
         public SystemTransformRecord build() {
-            Timestamp end = endTs;
-            if (end == null && transformStatus != null && transformStatus.equals(PTable.TaskStatus.COMPLETED.toString())) {
-                end = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            Timestamp lastTs = lastStateTs;
+            if (lastTs == null && transformStatus != null && transformStatus.equals(PTable.TaskStatus.COMPLETED.toString())) {
+                lastTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
             }
             return new SystemTransformRecord(transformType, schemaName,
-                    logicalTableName, tenantId, newPhysicalTableName, logicalParentName, transformStatus, transformJobId, transformRetryCount, startTs, end,
+                    logicalTableName, tenantId, newPhysicalTableName, logicalParentName, transformStatus, transformJobId, transformRetryCount, startTs, lastTs,
                     oldMetadata, newMetadata, transformFunction);
         }
 
@@ -266,7 +267,7 @@ public class SystemTransformRecord {
             builder.setTransformJobId(resultSet.getString(col++));
             builder.setTransformRetryCount(resultSet.getInt(col++));
             builder.setStartTs(resultSet.getTimestamp(col++));
-            builder.setEndTs(resultSet.getTimestamp(col++));
+            builder.setLastStateTs(resultSet.getTimestamp(col++));
             builder.setOldMetadata(resultSet.getString(col++));
             builder.setNewMetadata(resultSet.getString(col++));
             builder.setTransformFunction(resultSet.getString(col++));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index bc396e5..8aecf1d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -21,15 +21,17 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.TableInfo;
 import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
-import org.apache.phoenix.mapreduce.transform.TransformTool;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -40,12 +42,13 @@ import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.JacksonUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TableViewFinderResult;
+import org.apache.phoenix.util.UpgradeUtil;
 import org.apache.phoenix.util.ViewUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,16 +59,45 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_TABLE_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_STATUS;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
 import static org.apache.phoenix.schema.MetaDataClient.CREATE_LINK;
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.VIEW;
 
 public class Transform {
     private static final Logger LOGGER = LoggerFactory.getLogger(Transform.class);
+    private static final String TRANSFORM_SELECT = "SELECT " +
+            PhoenixDatabaseMetaData.TENANT_ID + ", " +
+            PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+            PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
+            PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
+            PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
+            PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
+            TRANSFORM_STATUS + ", " +
+            PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
+            PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
+            PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
+            PhoenixDatabaseMetaData.TRANSFORM_LAST_STATE_TS + ", " +
+            PhoenixDatabaseMetaData.OLD_METADATA + " , " +
+            PhoenixDatabaseMetaData.NEW_METADATA + " , " +
+            PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
+            " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME;
 
     private static String generateNewTableName(String schema, String logicalTableName, long seqNum) {
         // TODO: Support schema versioning as well.
@@ -176,10 +208,13 @@ public class Transform {
         TableViewFinderResult childViewsResult = ViewUtil.findChildViews(connection, systemTransformParams.getTenantId()
                 , systemTransformParams.getSchemaName(), systemTransformParams.getLogicalTableName());
         for (TableInfo view : childViewsResult.getLinks()) {
-            addTransformTableLink(connection, view.getTenantId()==null? null: Bytes.toString(view.getTenantId()),
+            addTransformTableLink(connection, view.getTenantId()==null? null:Bytes.toString(view.getTenantId()),
                     (view.getSchemaName()==null? null: Bytes.toString(view.getSchemaName())), Bytes.toString(view.getTableName())
                     , newTableName, sequenceNum);
         }
+        // add a monitoring task
+        TransformMonitorTask.addTransformMonitorTask(connection, connection.getQueryServices().getConfiguration(), systemTransformParams,
+                PTable.TaskStatus.CREATED, new Timestamp(EnvironmentEdgeManager.currentTimeMillis()), null);
 
         return newTable;
     }
@@ -197,23 +232,40 @@ public class Transform {
         linkStatement.execute();
     }
 
-    public static SystemTransformRecord getTransformRecord(Configuration conf, PTableType tableType, PName schemaName,
+    public static PTable getTransformingNewTable(PhoenixConnection connection, PTable oldTable) throws SQLException{
+        SystemTransformRecord transformRecord = Transform.getTransformRecord(connection, oldTable.getType(), oldTable.getSchemaName()
+                , oldTable.getTableName(), oldTable.getType()==INDEX? oldTable.getParentTableName():null, oldTable.getTenantId()
+                , oldTable.getBaseTableLogicalName());
+
+        PTable transformingNewTable = null;
+        if (transformRecord != null && transformRecord.isActive()) {
+            // New table will behave like an index
+            PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName()));
+            if (!newTableNameWithoutSchema.equals(oldTable.getPhysicalName(true))) {
+                transformingNewTable = PhoenixRuntime.getTableNoCache(connection, transformRecord.getNewPhysicalTableName());
+            }
+        }
+        return transformingNewTable;
+    }
+
+    private static SystemTransformRecord getTransformRecord(PhoenixConnection connection, PTableType tableType, PName schemaName,
                                                            PName tableName, PName dataTableName, PName tenantId,
                                                            PName parentLogicalName) throws SQLException {
+
         if (tableType == PTableType.TABLE) {
-            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
-                return Transform.getTransformRecord(schemaName, tableName, null, tenantId, connection);
-            }
+            return Transform.getTransformRecord(schemaName, tableName, null, tenantId, connection);
+
         } else if (tableType == INDEX) {
-            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
-                return Transform.getTransformRecord(schemaName, tableName, dataTableName, tenantId, connection);
-            }
+            return Transform.getTransformRecord(schemaName, tableName, dataTableName, tenantId, connection);
         } else if (tableType == VIEW) {
-            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
-                return Transform.getTransformRecord(SchemaUtil.getSchemaNameFromFullName(parentLogicalName.getString()),
-                        SchemaUtil.getTableNameFromFullName(parentLogicalName.getString()), null, tenantId==null? null:tenantId.getString(), connection);
+            if (parentLogicalName == null) {
+                LOGGER.warn("View doesn't seem to have a parent");
+                return null;
             }
+            return Transform.getTransformRecord(SchemaUtil.getSchemaNameFromFullName(parentLogicalName.getString()),
+                    SchemaUtil.getTableNameFromFullName(parentLogicalName.getString()), null, tenantId == null ? null : tenantId.getString(), connection);
         }
+
         return null;
     }
 
@@ -232,22 +284,11 @@ public class Transform {
 
     public static SystemTransformRecord getTransformRecordFromDB(
             String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
-        String sql = "SELECT " +
-                PhoenixDatabaseMetaData.TENANT_ID + ", " +
-                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
-                PhoenixDatabaseMetaData.OLD_METADATA + " , " +
-                PhoenixDatabaseMetaData.NEW_METADATA + " , " +
-                PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
-                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE  " +
+        if (SYSTEM_TRANSFORM_NAME.equals(SchemaUtil.getTableName(schema, logicalTableName))) {
+            // Cannot query itself
+            return null;
+        }
+        String sql =  TRANSFORM_SELECT + " WHERE  " +
                 (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
                 (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
                 PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
@@ -257,6 +298,7 @@ public class Transform {
             if (resultSet.next()) {
                 return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
             }
+            LOGGER.info("Could not find System.Transform record with " + sql);
             return null;
         }
     }
@@ -312,11 +354,11 @@ public class Transform {
                 PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
                 PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
                 PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
+                TRANSFORM_STATUS + ", " +
                 PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
                 PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
                 PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_LAST_STATE_TS + ", " +
                 PhoenixDatabaseMetaData.OLD_METADATA + " , " +
                 PhoenixDatabaseMetaData.NEW_METADATA + " , " +
                 PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
@@ -353,8 +395,8 @@ public class Transform {
 
             stmt.setTimestamp(colNum++, systemTransformParams.getTransformStartTs());
 
-            if (systemTransformParams.getTransformEndTs() != null) {
-                stmt.setTimestamp(colNum++, systemTransformParams.getTransformEndTs());
+            if (systemTransformParams.getTransformLastStateTs() != null) {
+                stmt.setTimestamp(colNum++, systemTransformParams.getTransformLastStateTs());
             } else {
                 stmt.setNull(colNum++, Types.TIMESTAMP);
             }
@@ -379,10 +421,142 @@ public class Transform {
         }
     }
 
+    public static void doCutover(PhoenixConnection connection, SystemTransformRecord systemTransformRecord) throws Exception{
+        String tenantId = systemTransformRecord.getTenantId();
+        String schema = systemTransformRecord.getSchemaName();
+        String tableName = systemTransformRecord.getLogicalTableName();
+        String newTableName = SchemaUtil.getTableNameFromFullName(systemTransformRecord.getNewPhysicalTableName());
+
+        // Calculate changed metadata
+        List<String> columnNames = new ArrayList<>();
+        List<String> columnValues = new ArrayList<>();
+
+        getMetadataDifference(connection, systemTransformRecord, columnNames, columnValues);
+        // TODO In the future, we need to handle rowkey changes and column type changes as well
+
+        String
+                changeViewStmt = "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME %s) VALUES (%s, %s, '%s' %s)";
+
+        String
+                changeTable = String.format(
+                "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, PHYSICAL_TABLE_NAME %s) VALUES (%s, %s, '%s','%s' %s)",
+                (columnNames.size() > 0? "," + String.join(",", columnNames):""),
+                (tenantId==null? null: ("'" + tenantId + "'")),
+                (schema==null ? null : ("'" + schema + "'")), tableName, newTableName,
+                (columnValues.size() > 0? "," + String.join(",", columnValues):""));
+
+        LOGGER.info("About to do cutover via " + changeTable);
+        TableViewFinderResult childViewsResult = ViewUtil.findChildViews(connection, tenantId, schema, tableName);
+        boolean wasCommit = connection.getAutoCommit();
+        connection.setAutoCommit(false);
+        List<TableInfo> viewsToUpdateCache = new ArrayList<>();
+        try {
+            connection.createStatement().execute(changeTable);
+
+            // Update column qualifiers
+            PTable pNewTable = PhoenixRuntime.getTable(connection, systemTransformRecord.getNewPhysicalTableName());
+            PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(schema, tableName));
+            if (pOldTable.getImmutableStorageScheme() != pNewTable.getImmutableStorageScheme() ||
+                    pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+                MetaDataClient.mutateTransformProperties(connection, tenantId, schema, tableName, newTableName,
+                        pNewTable.getImmutableStorageScheme(), pNewTable.getEncodingScheme());
+                // We need to update the columns's qualifiers as well
+                mutateColumns(connection.unwrap(PhoenixConnection.class), pOldTable, pNewTable);
+
+                // Also update view column qualifiers
+                for (TableInfo view : childViewsResult.getLinks()) {
+                    PTable pView = PhoenixRuntime.getTable(connection, view.getTenantId()==null? null: Bytes.toString(view.getTenantId())
+                            , SchemaUtil.getTableName(view.getSchemaName(), view.getTableName()));
+                    mutateViewColumns(connection.unwrap(PhoenixConnection.class), pView, pNewTable);
+                }
+            }
+            connection.commit();
+
+            // We can have millions of views. We need to send it in batches
+            int maxBatchSize = connection.getQueryServices().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            int batchSize = 0;
+            for (TableInfo view : childViewsResult.getLinks()) {
+                String changeView = String.format(changeViewStmt,
+                        (columnNames.size() > 0? "," + String.join(",", columnNames):""),
+                        (view.getTenantId()==null || view.getTenantId().length == 0? null: ("'" + Bytes.toString(view.getTenantId()) + "'")),
+                        (view.getSchemaName()==null || view.getSchemaName().length == 0? null : ("'" + Bytes.toString(view.getSchemaName()) + "'")),
+                        Bytes.toString(view.getTableName()),
+                        (columnValues.size() > 0? "," + String.join(",", columnValues):""));
+                LOGGER.info("Cutover changing view via " + changeView);
+                connection.createStatement().execute(changeView);
+                viewsToUpdateCache.add(view);
+                batchSize++;
+                if (batchSize >= maxBatchSize) {
+                    connection.commit();
+                    batchSize = 0;
+                }
+            }
+            if (batchSize > 0) {
+                connection.commit();
+                batchSize = 0;
+            }
+
+            connection.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            UpgradeUtil.clearCacheAndGetNewTable(connection.unwrap(PhoenixConnection.class),
+                    connection.getTenantId(),
+                    schema, tableName, systemTransformRecord.getLogicalParentName(), MIN_TABLE_TIMESTAMP);
+            for (TableInfo view : viewsToUpdateCache) {
+                UpgradeUtil.clearCache(connection.unwrap(PhoenixConnection.class),
+                        PNameFactory.newName(view.getTenantId()),
+                        PNameFactory.newName(view.getSchemaName()).getString(),  Bytes.toString(view.getTableName()),
+                        tableName, MIN_TABLE_TIMESTAMP);
+            }
+
+            // TODO: Cleanup syscat so that we don't have an extra index
+        } catch (Exception e) {
+            LOGGER.error("Error happened during cutover ", e);
+            connection.rollback();
+            throw e;
+        } finally {
+            connection.setAutoCommit(wasCommit);
+        }
+    }
+
+    private static void getMetadataDifference(PhoenixConnection connection, SystemTransformRecord systemTransformRecord, List<String> columnNames, List<String> columnValues) throws SQLException {
+        PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getQualifiedTableName(systemTransformRecord.getSchemaName(),systemTransformRecord.getLogicalTableName()));
+        PTable pNewTable = PhoenixRuntime.getTable(connection, SchemaUtil.getQualifiedTableName(SchemaUtil.getSchemaNameFromFullName(systemTransformRecord.getNewPhysicalTableName()),
+                SchemaUtil.getTableNameFromFullName(systemTransformRecord.getNewPhysicalTableName())));
+
+        Map<String, String> map = pOldTable.getPropertyValues();
+        for(Map.Entry<String, String> entry : map.entrySet()) {
+            String oldKey = entry.getKey();
+            String oldValue = entry.getValue();
+            if (pNewTable.getPropertyValues().containsKey(oldKey)) {
+                if (PHYSICAL_TABLE_NAME.equals(oldKey)) {
+                    // No need to add here. We will add it.
+                    continue;
+                }
+                String newValue = pNewTable.getPropertyValues().get(oldKey);
+                if (!Strings.nullToEmpty(oldValue).equals(Strings.nullToEmpty(newValue))) {
+                    columnNames.add(oldKey);
+                    // properties value that corresponds to a number will not need single quotes around it
+                    // properties value that corresponds to a boolean value will not need single quotes around it
+                    if (!Strings.isNullOrEmpty(newValue)) {
+                        if(!(StringUtils.isNumeric(newValue)) &&
+                                !(newValue.equalsIgnoreCase(Boolean.TRUE.toString()) ||newValue.equalsIgnoreCase(Boolean.FALSE.toString()))) {
+                                if (ENCODING_SCHEME.equals(oldKey)) {
+                                    newValue = String.valueOf(PTable.QualifierEncodingScheme.valueOf(newValue).getSerializedMetadataValue());
+                                } else if (IMMUTABLE_STORAGE_SCHEME.equals(oldKey)) {
+                                    newValue = String.valueOf(PTable.ImmutableStorageScheme.valueOf(newValue).getSerializedMetadataValue());
+                                }
+                                else {
+                                    newValue = "'" + newValue + "'";
+                                }
+                        }
+                    }
+                    columnValues.add(newValue);
+                }
+            }
+        }
+    }
 
     public static void completeTransform(Connection connection, Configuration configuration) throws Exception{
         // Will be called from Reducer
-        long timestamp= EnvironmentEdgeManager.currentTimeMillis();
         String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
         String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
         String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
@@ -390,60 +564,122 @@ public class Transform {
         String indexTableName = SchemaUtil.getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
         String logicaTableName = oldTableLogicalName;
         String logicalParentName = null;
-        if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE)
+        if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE) {
             if (!Strings.isNullOrEmpty(indexTableName)) {
                 logicaTableName = indexTableName;
                 logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
             }
-        boolean isPartial = PhoenixConfigurationUtil.getIsPartialTransform(configuration);
+        }
+
         SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName, logicalParentName,
                 tenantId, connection.unwrap(PhoenixConnection.class));
-        if (!isPartial) {
-            String newTableName = SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
-            PTable pNewTable = PhoenixRuntime.getTable(connection, transformRecord.getNewPhysicalTableName());
-            PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(schemaName,logicaTableName));
-            if (pOldTable.getImmutableStorageScheme() != pNewTable.getImmutableStorageScheme() ||
-                    pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
-                MetaDataClient.mutateTransformProperties(connection, tenantId, schemaName, logicaTableName, newTableName,
-                        pNewTable.getImmutableStorageScheme(), pNewTable.getEncodingScheme());
-                // We need to update the columns's qualifiers as well
-                if (pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
-                    Short nextKeySeq = 0;
-                    for (PColumn newCol : pNewTable.getColumns()) {
-                        boolean isPk = SchemaUtil.isPKColumn(newCol);
-                        Short keySeq = isPk ? ++nextKeySeq : null;
-                        PColumn column = new PColumnImpl(newCol.getName(), newCol.getFamilyName(), newCol.getDataType(),
-                                newCol.getMaxLength(), newCol.getScale(), newCol.isNullable(), newCol.getPosition(), newCol.getSortOrder()
-                                , newCol.getArraySize(),
-                                newCol.getViewConstant(), newCol.isViewReferenced(), newCol.getExpressionStr(), newCol.isRowTimestamp(),
-                                newCol.isDynamic(), newCol.getColumnQualifierBytes(), EnvironmentEdgeManager.currentTimeMillis());
-                        addColumnMutation(connection.unwrap(PhoenixConnection.class), schemaName, logicaTableName, column,
-                                pNewTable.getParentTableName()==null? null:pNewTable.getParentTableName().getString()
-                                , pNewTable.getPKName()==null? null:pNewTable.getPKName().getString(), keySeq , pNewTable.getBucketNum() != null);
-                    }
-                }
-            }
-            // Clear cache so that the new table is used for queries
-            connection.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
-            TransformTool.updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
-
-            // TODO Kick partial transform from the TransformMonitor
-            SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
-            builder.setTransformStatus(PTable.TransformStatus.CREATED.name());
-            builder.setTransformJobId(null);
-            builder.setStartTs(new Timestamp(timestamp));
-            builder.setTransformRetryCount(0);
-            builder.setTransformType(PTable.TransformType.METADATA_TRANSFORM_PARTIAL);
-            SystemTransformRecord partialStr = builder.build();
-            Transform.upsertTransform(partialStr, connection.unwrap(PhoenixConnection.class));
+
+        if (!PTable.TransformType.isPartialTransform(transformRecord.getTransformType())) {
+            updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.PENDING_CUTOVER);
             connection.commit();
         } else {
-            TransformTool.updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
+            updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
             connection.commit();
-            // TODO: cleanup
         }
     }
 
+    public static void updateTransformRecord(PhoenixConnection connection, SystemTransformRecord transformRecord, PTable.TransformStatus newStatus) throws SQLException {
+        SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
+        builder.setTransformStatus(newStatus.name());
+        builder.setLastStateTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()));
+        if (newStatus == PTable.TransformStatus.STARTED) {
+            builder.setTransformRetryCount(transformRecord.getTransformRetryCount() + 1);
+        }
+        Transform.upsertTransform(builder.build(), connection);
+    }
+
+    private static void mutateColumns(PhoenixConnection connection, PTable pOldTable, PTable pNewTable) throws SQLException {
+        if (pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+            Short nextKeySeq = 0;
+            for (PColumn column : pNewTable.getColumns()) {
+                boolean isPk = SchemaUtil.isPKColumn(column);
+                Short keySeq = isPk ? ++nextKeySeq : null;
+                PColumn newCol = new PColumnImpl(column.getName(), column.getFamilyName(), column.getDataType(),
+                        column.getMaxLength(), column.getScale(), column.isNullable(), column.getPosition(), column.getSortOrder()
+                        , column.getArraySize(),
+                        column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(),
+                        column.isDynamic(), column.getColumnQualifierBytes(), EnvironmentEdgeManager.currentTimeMillis());
+                addColumnMutation(connection, pOldTable.getSchemaName()==null?null:pOldTable.getSchemaName().getString()
+                        , pOldTable.getTableName().getString(), newCol,
+                        pNewTable.getParentTableName() == null ? null : pNewTable.getParentTableName().getString()
+                        , pNewTable.getPKName() == null ? null : pNewTable.getPKName().getString(), keySeq, pNewTable.getBucketNum() != null);
+            }
+        }
+    }
+
+    private static void mutateViewColumns(PhoenixConnection connection, PTable pView, PTable pNewTable) throws SQLException {
+        if (pView.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+            Short nextKeySeq = 0;
+            PTable.EncodedCQCounter cqCounterToUse = pNewTable.getEncodedCQCounter();
+            String defaultColumnFamily = pNewTable.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(pNewTable.getDefaultFamilyName().getString()) ?
+                    pNewTable.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
+            for (PColumn column : pView.getColumns()) {
+                boolean isPk = SchemaUtil.isPKColumn(column);
+                Short keySeq = isPk ? ++nextKeySeq : null;
+                if (isPk) {
+                    continue;
+                }
+                String familyName = null;
+                if (pNewTable.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                    familyName = column.getFamilyName() != null ? column.getFamilyName().getString() : defaultColumnFamily;
+                } else {
+                    familyName = defaultColumnFamily;
+                }
+                int encodedCQ = pView.isAppendOnlySchema() ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + keySeq) : cqCounterToUse.getNextQualifier(familyName);
+                if (!pView.isAppendOnlySchema()) {
+                    cqCounterToUse.increment(familyName);
+                }
+
+                byte[] colQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(column.getName().getString(),
+                        encodedCQ, pNewTable, isPk);
+
+                if (column.isDerived()) {
+                    // Don't need to add/change derived columns
+                    continue;
+                }
+
+                PColumn newCol = new PColumnImpl(column.getName(), PNameFactory.newName(familyName), column.getDataType(),
+                        column.getMaxLength(), column.getScale(), column.isNullable(), column.getPosition(), column.getSortOrder()
+                        , column.getArraySize(),
+                        column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(),
+                        column.isDynamic(), colQualifierBytes, EnvironmentEdgeManager.currentTimeMillis());
+                String tenantId = pView.getTenantId() == null? null:pView.getTenantId().getString();
+                addColumnMutation(connection, tenantId, pView.getSchemaName()==null?null:pView.getSchemaName().getString()
+                        , pView.getTableName().getString(), newCol,
+                        pView.getParentTableName() == null ? null : pView.getParentTableName().getString()
+                        , pView.getPKName() == null ? null : pView.getPKName().getString(), keySeq, pView.getBucketNum() != null);
+            }
+        }
+    }
+
+    public static void doForceCutover(Connection connection, Configuration configuration) throws Exception{
+        PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
+        // Will be called from Reducer
+        String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
+        String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
+        String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
+        String oldTableLogicalName = SchemaUtil.getTableNameFromFullName(fullOldTableName);
+        String indexTableName = SchemaUtil.getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
+        String logicaTableName = oldTableLogicalName;
+        String logicalParentName = null;
+        if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE)
+            if (!Strings.isNullOrEmpty(indexTableName)) {
+                logicaTableName = indexTableName;
+                logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
+            }
+
+        SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName, logicalParentName,
+                tenantId, phoenixConnection);
+        Transform.doCutover(phoenixConnection, transformRecord);
+        updateTransformRecord(phoenixConnection, transformRecord, PTable.TransformStatus.COMPLETED);
+        phoenixConnection.commit();
+    }
+
 }
 
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index 80d32e7..da445f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
@@ -130,6 +131,10 @@ public class TransformMaintainer extends IndexMaintainer {
         this.isOldTableSalted = isOldTableSalted;
     }
 
+    public Set<ColumnReference> getAllColumns() {
+        return new HashSet<>();
+    }
+
     private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) {
         this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
         this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable();
@@ -140,7 +145,7 @@ public class TransformMaintainer extends IndexMaintainer {
         this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
         this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
 
-        this.newTableName = SchemaUtil.getTableName(newTable.getSchemaName(), newTable.getTableName()).getBytes();
+        this.newTableName = newTable.getPhysicalName().getBytes();
         boolean newTableWALDisabled = newTable.isWALDisabled();
         int nNewTableColumns = newTable.getColumns().size();
         int nNewTablePKColumns = newTable.getPKColumns().size();
@@ -297,14 +302,11 @@ public class TransformMaintainer extends IndexMaintainer {
             cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
             cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
             builder.addOldTableColRefForCoveredColumns(cRefBuilder.build());
-            if (maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS) {
-                // We need to serialize the colRefs of new tables only in case of encoded column names.
-                ColumnReference newTableColRef = e.getValue();
-                cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
-                cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
-                cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
-                builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
-            }
+            ColumnReference newTableColRef = e.getValue();
+            cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+            cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
+            builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
         }
 
         builder.setNewTableColumnCount(maintainer.newTableColumnCount);
@@ -383,19 +385,12 @@ public class TransformMaintainer extends IndexMaintainer {
         List<ServerCachingProtos.ColumnReference> oldTableColRefsForCoveredColumnsList = proto.getOldTableColRefForCoveredColumnsList();
         List<ServerCachingProtos.ColumnReference> newTableColRefsForCoveredColumnsList = proto.getNewTableColRefForCoveredColumnsList();
         maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(oldTableColRefsForCoveredColumnsList.size());
-        boolean encodedColumnNames = maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS;
         Iterator<ServerCachingProtos.ColumnReference> newTableColRefItr = newTableColRefsForCoveredColumnsList.iterator();
         for (ServerCachingProtos.ColumnReference colRefFromProto : oldTableColRefsForCoveredColumnsList) {
             ColumnReference oldTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray());
             ColumnReference newTableColRef;
-            if (encodedColumnNames) {
-                ServerCachingProtos.ColumnReference fromProto = newTableColRefItr.next();
-                newTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier().toByteArray());
-            } else {
-                byte[] cq = oldTableColRef.getQualifier();
-                byte[] cf = oldTableColRef.getFamily();
-                newTableColRef = new ColumnReference(cf, cq);
-            }
+            ServerCachingProtos.ColumnReference fromProto = newTableColRefItr.next();
+            newTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier().toByteArray());
             maintainer.coveredColumnsMap.put(oldTableColRef, newTableColRef);
         }
         maintainer.logicalNewTableName = proto.getLogicalNewTableName();
@@ -460,18 +455,30 @@ public class TransformMaintainer extends IndexMaintainer {
             oldTableRowKeySchema.iterator(rowKeyPtr, ptr, dataPosOffset);
 
             // Write new table row key
+            int trailingVariableWidthColumnNum = 0;
             while (oldTableRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset) != null) {
                 output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
                 if (!oldTableRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
                     output.writeByte(SchemaUtil.getSeparatorByte(newTableRowKeyOrderOptimizable, ptr.getLength()==0
                             , oldTableRowKeySchema.getField(dataPosOffset)));
+                    trailingVariableWidthColumnNum++;
+                } else {
+                    trailingVariableWidthColumnNum = 0;
                 }
+
                 dataPosOffset++;
             }
 
             byte[] newTableRowKey = stream.getBuffer();
             // Remove trailing nulls
             int length = stream.size();
+            // The existing code does not eliminate the separator if the data type is not nullable. It not clear why.
+            // The actual bug is in the calculation of maxTrailingNulls with view indexes. So, in order not to impact some other cases, we should keep minLength check here.
+            while (trailingVariableWidthColumnNum > 0 && length > 0 && newTableRowKey[length-1] == QueryConstants.SEPARATOR_BYTE) {
+                length--;
+                trailingVariableWidthColumnNum--;
+            }
+
             if (isNewTableSalted) {
                 // Set salt byte
                 byte saltByte = SaltingUtil.getSaltingByte(newTableRowKey, SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES, nNewTableSaltBuckets);
@@ -483,30 +490,13 @@ public class TransformMaintainer extends IndexMaintainer {
         }
     }
 
-    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable oldRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable oldRowKeyPtr,
+                                   long ts, byte[] regionStartKey, byte[] regionEndKey, boolean verified) throws IOException {
         byte[] newRowKey = this.buildRowKey(valueGetter, oldRowKeyPtr, regionStartKey, regionEndKey, ts);
         return buildUpdateMutation(kvBuilder, valueGetter, oldRowKeyPtr, ts, regionStartKey, regionEndKey,
                 newRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
-                newTableEmptyKeyValueRef, newTableWALDisabled, oldTableImmutableStorageScheme, newTableImmutableStorageScheme, newTableEncodingScheme, false);
-    }
-
-    public Delete buildRowDeleteMutation(byte[] rowKey, DeleteType deleteType, long ts) {
-        byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
-        Delete delete = new Delete(rowKey);
-
-        for (ColumnReference ref : newTableColumns) {
-            if (deleteType == DeleteType.SINGLE_VERSION) {
-                delete.addFamilyVersion(ref.getFamily(), ts);
-            } else {
-                delete.addFamily(ref.getFamily(), ts);
-            }
-        }
-        if (deleteType == DeleteType.SINGLE_VERSION) {
-            delete.addFamilyVersion(emptyCF, ts);
-        } else {
-            delete.addFamily(emptyCF, ts);
-        }
-        return delete;
+                newTableEmptyKeyValueRef, newTableWALDisabled, oldTableImmutableStorageScheme, newTableImmutableStorageScheme,
+                newTableEncodingScheme, oldTableEncodingScheme, verified);
     }
 
     public ImmutableBytesPtr getEmptyKeyValueFamily() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 961aec6..0bb6f9a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -1110,12 +1110,22 @@ public class SchemaUtil {
         return TableName.valueOf(schemaName, tableName);
     }
 
+    public static PName getPhysicalHBaseTableName(byte[] fullTableName, boolean isNamespaceMappingEnabled) {
+        String tableName = getTableNameFromFullName(fullTableName);
+        String schemaName = getSchemaNameFromFullName(fullTableName);
+        return getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMappingEnabled);
+    }
+
     public static PName getPhysicalHBaseTableName(String schemaName, String tableName, boolean isNamespaceMapped) {
         if (!isNamespaceMapped) { return PNameFactory.newName(getTableNameAsBytes(schemaName, tableName)); }
         if (schemaName == null || schemaName.isEmpty()) { return PNameFactory.newName(tableName); }
         return PNameFactory.newName(schemaName + QueryConstants.NAMESPACE_SEPARATOR + tableName);
     }
 
+    public static String replaceNamespaceSeparator(PName name) {
+        return name.getString().replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR);
+    }
+
     public static boolean isSchemaCheckRequired(PTableType tableType, ReadOnlyProps props) {
         return PTableType.TABLE.equals(tableType) && isNamespaceMappingEnabled(tableType, props);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 9bc25e1..04156f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -67,6 +67,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.text.Format;
 import java.text.SimpleDateFormat;
@@ -2250,23 +2251,19 @@ public class UpgradeUtil {
                     readOnlyProps, PhoenixRuntime.getCurrentScn(readOnlyProps), fullTableName,
                     table.getType(),conn.getTenantId());
             // clear the cache and get new table
-            conn.removeTable(conn.getTenantId(), fullTableName,
-                table.getParentName() != null ? table.getParentName().getString() : null,
-                table.getTimeStamp());
-            byte[] tenantIdBytes = conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY :
-                    conn.getTenantId().getBytes();
-            conn.getQueryServices().clearTableFromCache(
-                    tenantIdBytes,
-                    table.getSchemaName().getBytes(), table.getTableName().getBytes(),
-                    PhoenixRuntime.getCurrentScn(readOnlyProps));
-            MetaDataMutationResult result =
-                    new MetaDataClient(conn).updateCache(conn.getTenantId(), schemaName, tableName,
-                        true);
+            MetaDataMutationResult result = clearCacheAndGetNewTable(conn,
+                    conn.getTenantId(),
+                    table.getSchemaName()==null?null:table.getSchemaName().getString(),
+                    table.getTableName().getString(),
+                    table.getParentName()==null?null:table.getParentName().getString(),
+                    table.getTimeStamp());
+
             if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                 throw new TableNotFoundException(schemaName, fullTableName);
             }
             table = result.getTable();
-            
+            byte[] tenantIdBytes = conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY :
+                    conn.getTenantId().getBytes();
             // check whether table is properly upgraded before upgrading indexes
             if (table.isNamespaceMapped()) {
                 for (PTable index : table.getIndexes()) {
@@ -2319,6 +2316,7 @@ public class UpgradeUtil {
                                 index.getTableName());
                         conn.commit();
                     }
+
                     conn.getQueryServices().clearTableFromCache(
                             tenantIdBytes,
                             index.getSchemaName().getBytes(), index.getTableName().getBytes(),
@@ -2368,6 +2366,31 @@ public class UpgradeUtil {
         }
     }
 
+    public static MetaDataMutationResult clearCacheAndGetNewTable(PhoenixConnection conn, PName tenantId,
+                                                                  String schemaName, String tableName, String parentName, long timestamp)
+            throws SQLException {
+        clearCache(conn, tenantId, schemaName, tableName, parentName, timestamp);
+        MetaDataMutationResult result =
+                new MetaDataClient(conn).updateCache(tenantId, schemaName, tableName,
+                        true);
+        return result;
+    }
+
+    public static void clearCache(PhoenixConnection conn, PName tenantId,
+                                                String schemaName, String tableName, String parentName, long timestamp)
+            throws SQLException {
+        conn.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName),
+                parentName, timestamp);
+        byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY :
+                tenantId.getBytes();
+        byte[] schemaBytes = schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY :
+                schemaName.getBytes();
+        conn.getQueryServices().clearTableFromCache(
+                tenantIdBytes,
+                schemaBytes, tableName.getBytes(),
+                PhoenixRuntime.getCurrentScn(conn.getQueryServices().getProps()));
+    }
+
     private static void updateIndexesSequenceIfPresent(PhoenixConnection connection, PTable dataTable)
             throws SQLException {
         PName tenantId = connection.getTenantId();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index affef44..a14d9b3 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -789,7 +789,7 @@ public abstract class BaseTest {
             expectedColumnEncoding, String tableName)
             throws Exception {
         PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
-        PTable table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), tableName));
+        PTable table = PhoenixRuntime.getTableNoCache(phxConn, tableName);
         assertEquals(expectedStorageScheme, table.getImmutableStorageScheme());
         assertEquals(expectedColumnEncoding, table.getEncodingScheme());
     }