You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by go...@apache.org on 2022/01/11 05:11:52 UTC

[phoenix] branch 4.x updated: PHOENIX-6620 TransformTool to fix unverified rows and do validation

This is an automated email from the ASF dual-hosted git repository.

gokcen pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new d32745d  PHOENIX-6620 TransformTool to fix unverified rows and do validation
d32745d is described below

commit d32745d8c4d46f1ff20f0b346af3d21ffc1e65e8
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Wed Sep 8 17:17:10 2021 -0700

    PHOENIX-6620 TransformTool to fix unverified rows and do validation
    
    Signed-off-by: Gokcen Iskender <go...@gmail.com>
---
 .../apache/phoenix/end2end/TransformToolIT.java    | 327 +++++++++++++++++++--
 .../end2end/index/ImmutableIndexExtendedIT.java    |   2 +-
 .../apache/phoenix/end2end/index/TransformIT.java  |  31 ++
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +-
 .../index/PhoenixIndexImportDirectReducer.java     |  18 +-
 .../transform/PhoenixTransformReducer.java         |  38 ++-
 .../transform/PhoenixTransformRepairMapper.java    | 205 +++++++++++++
 .../phoenix/mapreduce/transform/TransformTool.java | 122 +++++++-
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  17 ++
 .../org/apache/phoenix/schema/MetaDataClient.java  |   4 +
 .../schema/tool/SchemaExtractionProcessor.java     |   8 +
 .../schema/transform/TransformMaintainer.java      |  56 +++-
 12 files changed, 767 insertions(+), 64 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
index 5002c8c..52646aa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.index.SingleCellIndexIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.mapreduce.transform.TransformTool;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -54,6 +55,18 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import static org.apache.phoenix.end2end.index.ImmutableIndexExtendedIT.getRowCountForEmptyColValue;
+import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.getRowCount;
 import static org.junit.Assert.assertEquals;
@@ -71,7 +84,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
 
     public TransformToolIT() {
         StringBuilder optionBuilder = new StringBuilder();
-        optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE, TTL=18000 ");
+        optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE ");
         if (!mutable) {
             optionBuilder.append(", IMMUTABLE_ROWS=true ");
         }
@@ -86,16 +99,21 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
                 QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
         serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+        serverProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
                 new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
     private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows) throws SQLException {
+        createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+    }
+
+    private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows, String tableOptions) throws SQLException {
         String stmString1 =
                 "CREATE TABLE IF NOT EXISTS " + dataTableFullName
                         + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
-                        + tableDDLOptions;
+                        + tableOptions;
         conn.createStatement().execute(stmString1);
         String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
         PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
@@ -125,7 +143,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             assertNotNull(record);
 
             List<String> args = getArgList(schemaName, dataTableName, null,
-                    null, null, null, false, false, false, false);
+                    null, null, null, false, false, false, false,false);
             runTransformTool(args.toArray(new String[0]), 0);
             record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
             assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
@@ -167,7 +185,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             assertNotNull(record);
 
             List<String> args = getArgList(schemaName, dataTableName, null,
-                    null, null, null, true, false, false, false);
+                    null, null, null, true, false, false, false, false);
 
             runTransformTool(args.toArray(new String[0]), 0);
             record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
@@ -187,7 +205,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         assertNotNull(record);
 
         List<String> args = getArgList(schemaName, dataTableName, null,
-                null, null, null, false, true, false, false);
+                null, null, null, false, true, false, false, false);
 
         runTransformTool(args.toArray(new String[0]), 0);
         record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
@@ -214,7 +232,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             conn.setAutoCommit(true);
             pauseTableTransform(schemaName, dataTableName, conn);
             List<String> args = getArgList(schemaName, dataTableName, null,
-                    null, null, null, false, false, true, false);
+                    null, null, null, false, false, true, false, false);
 
             runTransformTool(args.toArray(new String[0]), 0);
             SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
@@ -282,7 +300,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
                     " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
 
             List<String> args = getArgList(schemaName, dataTableName, null,
-                    null, null, null, false, false, false, false);
+                    null, null, null, false, false, false, false, false);
             // split if data table more than 3 regions
             args.add("--autosplit=3");
 
@@ -336,7 +354,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
                     " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
 
             List<String> args = getArgList(schemaName, dataTableName, null,
-                    null, null, null, false, false, false, false);
+                    null, null, null, false, false, false, false, false);
             runTransformTool(args.toArray(new String[0]), 0);
 
             SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newDataTN.getNameAsString());
@@ -382,7 +400,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             assertNotNull(record);
 
             List<String> args = getArgList(schemaName, dataTableName, indexTableName,
-                    null, null, null, false, false, false, false);
+                    null, null, null, false, false, false, false, false);
 
             runTransformTool(args.toArray(new String[0]), 0);
             record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
@@ -413,13 +431,24 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
         String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String parentViewName = "VWP_" + generateUniqueName();
+        String viewName = "VW_" + generateUniqueName();
+        String viewIdxName = "VWIDX_" + generateUniqueName();
 
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(true);
-            IndexRegionObserver.setFailPreIndexUpdatesForTesting(true);
             int numOfRows = 0;
             createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            String createParentViewSql = "CREATE VIEW " + parentViewName + " ( PARENT_VIEW_COL1 VARCHAR ) AS SELECT * FROM " + dataTableFullName;
+            conn.createStatement().execute(createParentViewSql);
+
+            String createViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + parentViewName;
+            conn.createStatement().execute(createViewSql);
+
+            String createViewIdxSql = "CREATE INDEX " + viewIdxName + " ON " + viewName + " (VIEW_COL1) include (VIEW_COL2) ";
+            conn.createStatement().execute(createViewIdxSql);
+
             SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
 
             conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
@@ -427,8 +456,8 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
             assertNotNull(record);
             assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
-
-            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(true);
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", viewName);
             PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
             try {
                 IndexToolIT.upsertRow(stmt1, 1);
@@ -436,36 +465,280 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
             } catch (Exception e) {
             }
             assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
-            assertEquals(0, getRowCount(conn,dataTableFullName));
+            assertEquals(0, getRowCount(conn, dataTableFullName));
 
             IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
             IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
             IndexToolIT.upsertRow(stmt1, 2);
+            IndexToolIT.upsertRow(stmt1, 3);
 
-            assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
-            assertEquals(1, getRowCount(conn,dataTableFullName));
+            assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(2, getRowCount(conn,dataTableFullName));
 
             IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
             IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
             try {
-                IndexToolIT.upsertRow(stmt1, 3);
+                IndexToolIT.upsertRow(stmt1, 4);
                 fail("Data table upsert should have failed");
             } catch (Exception e) {
             }
-            assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
-            assertEquals(1, getRowCount(conn,dataTableFullName));
+            try {
+                IndexToolIT.upsertRow(stmt1, 5);
+                fail("Data table upsert should have failed");
+            } catch (Exception e) {
+            }
+            assertEquals(4, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(2, getRowCount(conn,dataTableFullName));
 
             IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
 
             List<String> args = getArgList(schemaName, dataTableName, null,
-                    null, null, null, false, false, false, false);
+                    null, null, null, false, false, false, false, true);
             runTransformTool(args.toArray(new String[0]), 0);
 
-            //TODO: Implement GlobalIndexChecker like repair for unverified rows
-            assertEquals(1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
+            assertEquals(2, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
                     .getTable(Bytes.toBytes(dataTableFullName)), false));
-//            assertEquals(1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
-//                    .getTable(Bytes.toBytes(record.getNewPhysicalTableName())), false));
+            assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+            assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+        } finally {
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+        }
+    }
+
+    @Test
+    public void testTransformVerifiedForTransactionalTable() throws Exception {
+        // TODO: Column encoding is not supported for OMID. Have omid test for other type of transforms.
+        // For now, we don't support transforms other than storage and column encoding type change.
+        //testVerifiedForTransactionalTable("OMID");
+        testVerifiedForTransactionalTable("TEPHRA");
+    }
+
+    private void testVerifiedForTransactionalTable(String provider) throws Exception{
+        String tableOptions = tableDDLOptions + " ,TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'";
+
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableOptions);
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            List<String> args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false, false);
+            runTransformTool(args.toArray(new String[0]), 0);
+            assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+            // Run again to check that VERIFIED row still remains verified
+            runTransformTool(args.toArray(new String[0]), 0);
+            assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+            assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+
+            // We will have two rows with empty col = 'x' since there is no IndexRegionObserver for transactional table
+            IndexToolIT.upsertRow(stmt1, ++numOfRows);
+            IndexToolIT.upsertRow(stmt1, ++numOfRows);
+
+            assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+            assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+            assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), EMPTY_COLUMN_VALUE_BYTES));
+
+            // Even when we run TransformTool again, verified bit is not cleared but the empty column stays as is
+            args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false, false);
+            runTransformTool(args.toArray(new String[0]), 0);
+            assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+            assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+            assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), EMPTY_COLUMN_VALUE_BYTES));
+        }
+    }
+
+    @Test
+    public void testTransformVerify() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 2;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(true);
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            try {
+                IndexToolIT.upsertRow(stmt1, numOfRows+1);
+                fail("New table upsert should have failed");
+            } catch (Exception e) {
+            }
+            // We didn't run transform tool yet. So new table would have 0.
+            assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(numOfRows, getRowCount(conn, dataTableFullName));
+
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            IndexToolIT.upsertRow(stmt1, ++numOfRows);
+            IndexToolIT.upsertRow(stmt1, ++numOfRows);
+
+            // 2 missing in the new table since it is the initial rows
+            assertEquals(numOfRows-2, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(numOfRows, getRowCount(conn,dataTableFullName));
+
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+            try {
+                IndexToolIT.upsertRow(stmt1, numOfRows+1);
+                fail("Data table upsert should have failed");
+            } catch (Exception e) {
+            }
+            //  Three unverified row in new table, original 2 rows missing since transform tool did not run.
+            //  One unverified row is not in the data table. 2 unverified row is in the data table.
+            assertEquals(numOfRows-1, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(numOfRows, getRowCount(conn,dataTableFullName));
+
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+
+            List<String> args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false, false);
+            args.add("-v");
+            args.add(IndexTool.IndexVerifyType.ONLY.getValue());
+            // Run only validation and check
+            TransformTool transformTool = runTransformTool(args.toArray(new String[0]), 0);
+            assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+            assertEquals(0, transformTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(numOfRows-2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+
+            args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false, false);
+            args.add("-v");
+            args.add(IndexTool.IndexVerifyType.AFTER.getValue());
+            // Run after validation and check that 2 missing (initial) rows are built.
+            transformTool = runTransformTool(args.toArray(new String[0]), 0);
+            assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+            assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+
+            int numOfRows2= 2;
+            String dataTableName2 = generateUniqueName();
+            String dataTableFullName2 = SchemaUtil.getTableName(schemaName, dataTableName2);
+            createTableAndUpsertRows(conn, dataTableFullName2, numOfRows2);
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName2 +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            record = Transform.getTransformRecord(schemaName, dataTableName2, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            //  Original 2 rows missing since transform tool did not run
+            assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(numOfRows2, getRowCount(conn,dataTableFullName2));
+
+            args = getArgList(schemaName, dataTableName2, null,
+                    null, null, null, false, false, false, false, false);
+            args.add("-v");
+            args.add(IndexTool.IndexVerifyType.BEFORE.getValue());
+            // Run before validation and check.
+            transformTool = runTransformTool(args.toArray(new String[0]), 0);
+            assertEquals(numOfRows2, transformTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(numOfRows2, transformTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+            assertEquals(numOfRows2, transformTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(numOfRows2-2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(numOfRows2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+        } finally {
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+        }
+    }
+
+    @Test
+    public void testTransformVerify_shouldFixUnverified() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 1;
+            int numOfRowsInNewTbl = 0;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            // We didn't run transform tool yet. So new table would have 0.
+            assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(numOfRows, getRowCount(conn, dataTableFullName));
+
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            IndexToolIT.upsertRow(stmt1, ++numOfRows);
+            numOfRowsInNewTbl++;
+            IndexToolIT.upsertRow(stmt1, ++numOfRows);
+            numOfRowsInNewTbl++;
+
+            // 1 missing in the new table since it is the initial rows
+            assertEquals(numOfRows-1, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(numOfRows, getRowCount(conn,dataTableFullName));
+
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+            try {
+                numOfRowsInNewTbl++;
+                IndexToolIT.upsertRow(stmt1, numOfRows+1);
+                fail("Data table upsert should have failed");
+            } catch (Exception e) {
+            }
+            try {
+                numOfRowsInNewTbl++;
+                IndexToolIT.upsertRow(stmt1, numOfRows+2);
+                fail("Data table upsert should have failed");
+            } catch (Exception e) {
+            }
+            //  Four unverified row in new table, original 1 row is missing since transform tool did not run.
+            //  Two of the unverified rows is not in the data table.
+            assertEquals(numOfRowsInNewTbl, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(numOfRows, getRowCount(conn,dataTableFullName));
+
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            List<String> args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false, true);
+            args.add("-v");
+            args.add(IndexTool.IndexVerifyType.BEFORE.getValue());
+            TransformTool transformTool = runTransformTool(args.toArray(new String[0]), 0);
+            // Run after validation and check that 2 missing (initial) rows are built but the unverified one is not built yet since we didn't build with -fixunverified option.
+            assertEquals(numOfRowsInNewTbl, transformTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(2, transformTool.getJob().getCounters().findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT).getValue());
         } finally {
             IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
             IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
@@ -475,7 +748,8 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
 
     public static List<String> getArgList(String schemaName, String dataTable, String indxTable, String tenantId,
                                           Long startTime, Long endTime,
-                                          boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial) {
+                                          boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial,
+                                          boolean shouldFixUnverified) {
         List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("--schema=" + schemaName);
@@ -515,6 +789,9 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         if (isPartial) {
             args.add("-pt");
         }
+        if (shouldFixUnverified) {
+            args.add("-fu");
+        }
         return args;
     }
 
@@ -523,7 +800,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
                                          String dataTable, String indexTable, String tenantId,
                                          Long startTime, Long endTime) {
         List<String> args = getArgList(schemaName, dataTable, indexTable,
-                tenantId, startTime, endTime, false, false, false, false);
+                tenantId, startTime, endTime, false, false, false, false, false);
         args.add("-op");
         args.add("/tmp/" + UUID.randomUUID().toString());
         return args.toArray(new String[0]);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java
index 5d3b515..13e7e86 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java
@@ -217,7 +217,7 @@ public class ImmutableIndexExtendedIT extends ParallelStatsDisabledIT {
         TestUtil.waitForIndexState(conn, indexTable, PIndexState.ACTIVE);
     }
     
-    private static int getRowCountForEmptyColValue(Connection conn, String tableName,
+    public static int getRowCountForEmptyColValue(Connection conn, String tableName,
             byte[] valueBytes)  throws IOException, SQLException {
 
         PTable table = PhoenixRuntime.getTable(conn, tableName);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
index 136da65..e7aa5aa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
@@ -170,6 +170,37 @@ public class TransformIT extends ParallelStatsDisabledIT {
     }
 
     @Test
+    public void testTransformFailsForViewIndex() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            String schema = "S_" + generateUniqueName();
+            String tableName = "TBL_" + generateUniqueName();
+            String fullTableName = SchemaUtil.getTableName(schema, tableName);
+            String viewName = "VW_" + generateUniqueName();
+            String viewIdxName = "VWIDX_" + generateUniqueName();
+
+            String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+            conn.createStatement().execute(createTableSql);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+
+            String createViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + fullTableName;
+            conn.createStatement().execute(createViewSql);
+
+            String createViewIdxSql = "CREATE INDEX " + viewIdxName + " ON " + viewName + " (VIEW_COL1) include (VIEW_COL2) ";
+            conn.createStatement().execute(createViewIdxSql);
+
+            try {
+                conn.createStatement().execute("ALTER INDEX " + viewIdxName + " ON "  + viewName
+                        + " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+                fail();
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.CANNOT_TRANSFORM_VIEW_INDEX.getErrorCode(), e.getErrorCode());
+            }
+        }
+    }
+
+    @Test
     public void testTransformForLiveMutations_mutatingTable() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
             conn.setAutoCommit(true);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 9fe6687..f6eb8e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -567,7 +567,8 @@ public enum SQLExceptionCode {
             "Error writing DDL change to external schema registry"),
 
     CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE(910, "43M21",
-                                        "Cannot transform an index or a table who is already going through a transform.");
+                                        "Cannot transform an index or a table who is already going through a transform."),
+    CANNOT_TRANSFORM_VIEW_INDEX(911, "43M22", "Cannot transform a view index. Consider creating a new view index.");
 
     private final int errorCode;
     private final String sqlState;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index a341d2b..3f5462c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -60,7 +60,7 @@ public class PhoenixIndexImportDirectReducer extends
     private String indexTableName;
     private byte[] indexTableNameBytes;
 
-    private void updateCounters(IndexTool.IndexVerifyType verifyType,
+    protected void updateCounters(IndexTool.IndexVerifyType verifyType,
                                 Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context)
             throws IOException {
         Configuration configuration = context.getConfiguration();
@@ -162,13 +162,15 @@ public class PhoenixIndexImportDirectReducer extends
             }
         }
 
-        if (PhoenixConfigurationUtil.getIsTransforming(context.getConfiguration())) {
-            try {
-                Transform.completeTransform(ConnectionUtil
-                        .getInputConnection(context.getConfiguration()), context.getConfiguration());
-            } catch (Exception e) {
-                LOGGER.error(" Failed to complete transform", e);
-                throw new RuntimeException(e.getMessage());
+        if (verifyType != IndexTool.IndexVerifyType.ONLY) {
+            if (PhoenixConfigurationUtil.getIsTransforming(context.getConfiguration())) {
+                try {
+                    Transform.completeTransform(ConnectionUtil
+                            .getInputConnection(context.getConfiguration()), context.getConfiguration());
+                } catch (Exception e) {
+                    LOGGER.error(" Failed to complete transform", e);
+                    throw new RuntimeException(e.getMessage());
+                }
             }
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
index 4a1c4fe..6b5b217 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
@@ -21,48 +21,54 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexToolUtil;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectReducer;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.transform.Transform;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
+
 /**
  * Reducer class that does only one task and that is to complete transform.
  */
 public class PhoenixTransformReducer extends
-        Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> {
+        PhoenixIndexImportDirectReducer {
     private AtomicBoolean calledOnce = new AtomicBoolean(false);
 
     private static final Logger LOGGER =
             LoggerFactory.getLogger(PhoenixTransformReducer.class);
 
     @Override
-    protected void setup(Context context) throws IOException {
-    }
-
-    @Override
     protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1,
                           Context context)
             throws IOException, InterruptedException {
         if (!calledOnce.compareAndSet(false, true)) {
             return;
         }
-
-        try (final Connection
-                     connection = ConnectionUtil.getInputConnection(context.getConfiguration())){
-            // Complete full Transform and add a partial transform
-            Transform.completeTransform(connection, context.getConfiguration());
-        } catch (Exception e) {
-            LOGGER.error(" Failed to complete transform", e);
-            throw new RuntimeException(e.getMessage());
+        IndexTool.IndexVerifyType verifyType = getIndexVerifyType(context.getConfiguration());
+        if (verifyType != IndexTool.IndexVerifyType.NONE) {
+            updateCounters(verifyType, context);
         }
-    }
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException{
 
+        if (verifyType != IndexTool.IndexVerifyType.ONLY) {
+            try (final Connection
+                         connection = ConnectionUtil.getInputConnection(context.getConfiguration())) {
+                // Complete full Transform and add a partial transform
+                Transform.completeTransform(connection, context.getConfiguration());
+            } catch (Exception e) {
+                LOGGER.error(" Failed to complete transform", e);
+                throw new RuntimeException(e.getMessage());
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformRepairMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformRepairMapper.java
new file mode 100644
index 0000000..48edd8d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformRepairMapper.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.index.DirectHTableWriter;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+/**
+ * Mapper that hands over rows from data table to the index table.
+ */
+public class PhoenixTransformRepairMapper extends TableMapper<ImmutableBytesWritable, IntWritable> {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(PhoenixTransformRepairMapper.class);
+    private DirectHTableWriter writer;
+    private PhoenixConnection connection;
+    private ImmutableBytesPtr maintainers;
+    private int batchSize;
+    private List<Mutation> mutations ;
+
+    @Override
+    protected void setup(final Context context) throws IOException, InterruptedException {
+        super.setup(context);
+        final Configuration configuration = context.getConfiguration();
+        writer = new DirectHTableWriter(configuration);
+        try {
+            final Properties overrideProps = new Properties();
+            String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+            String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+            if(txScnValue==null && scn!=null) {
+                overrideProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, scn);
+            }
+            connection = ConnectionUtil.getOutputConnection(configuration, overrideProps).unwrap(PhoenixConnection.class);
+            maintainers=new ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration));
+            int maxSize =
+                    connection.getQueryServices().getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+                            QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+            batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+            this.mutations = Lists.newArrayListWithExpectedSize(batchSize);
+            LOGGER.info("Mutation Batch Size = " + batchSize);
+        } catch (SQLException e) {
+            tryClosingResources();
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    @Override
+    protected void map(ImmutableBytesWritable row, Result value, Context context)
+            throws IOException, InterruptedException {
+        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+        String oldTableName = PhoenixConfigurationUtil.getIndexToolDataTableName(context.getConfiguration());
+        Set<byte[]> extraRowsInNewTable = new HashSet<>();
+        try (Table oldHTable = connection.getQueryServices().getTable(Bytes.toBytes(oldTableName)))  {
+            for (Cell cell : value.rawCells()) {
+                Scan buildNewTableScan = new Scan();
+                // The following attributes are set to instruct UngroupedAggregateRegionObserver to do partial rebuild
+                buildNewTableScan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, TRUE_BYTES);
+                buildNewTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, maintainers.get());
+                buildNewTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+                buildNewTableScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+                IndexMaintainer transformMaintainer = TransformMaintainer.deserialize(maintainers.get()).get(0);
+
+                byte[] newRowKey = CellUtil.cloneRow(cell);
+                // Rebuild the new row from the corresponding row in the old data table
+                // To implement rowkey reordering etc, we need to rebuild the rowkey. For now it is the same
+                buildNewTableScan.withStartRow(newRowKey, true);
+                buildNewTableScan.withStopRow(newRowKey, true);
+                buildNewTableScan.setTimeRange(0, cell.getTimestamp()+1);
+                // Pass the index row key to the partial index builder which will rebuild the index row and check if the
+                // row key of this rebuilt index row matches with the passed index row key
+                buildNewTableScan.setAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY, newRowKey);
+                Result result = null;
+                try (ResultScanner resultScanner = oldHTable.getScanner(buildNewTableScan)) {
+                    result = resultScanner.next();
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(oldTableName, t);
+                }
+
+                // A single cell will be returned. We decode that here
+                byte[] scanVal = result.value();
+                long code = PLong.INSTANCE.getCodec().decodeLong(new ImmutableBytesWritable(scanVal), SortOrder.getDefault());
+                if (code == GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue()) {
+                    if (!extraRowsInNewTable.contains(newRowKey)) {
+                        extraRowsInNewTable.add(newRowKey);
+                    }
+                    // This means there does not exist an old table row for this unverified new table row
+                    // Delete the unverified row from the new table
+                    Delete del = transformMaintainer.buildRowDeleteMutation(newRowKey,
+                            IndexMaintainer.DeleteType.ALL_VERSIONS, cell.getTimestamp());
+                    mutations.add(del);
+                }
+                // Write Mutation Batch
+                if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) {
+                    writeBatch(mutations, context);
+                    mutations.clear();
+                }
+                context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT).setValue(extraRowsInNewTable.size());
+                // Make sure progress is reported to Application Master.
+                context.progress();
+            }
+        } catch (SQLException e) {
+            LOGGER.error(" Error {}  while read/write of a record ", e.getMessage());
+            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void writeBatch(List<Mutation> mutations, Context context)
+            throws IOException, SQLException, InterruptedException {
+        writer.write(mutations);
+        context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(mutations.size());
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+            // Write the last & final Mutation Batch
+            if (!mutations.isEmpty()) {
+                writeBatch(mutations, context);
+            }
+            // We are writing some dummy key-value as map output here so that we commit only one
+            // output to reducer.
+            context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
+                new IntWritable(0));
+            super.cleanup(context);
+        } catch (SQLException e) {
+            LOGGER.error(" Error {}  while read/write of a record ", e.getMessage());
+            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            throw new RuntimeException(e);
+        } finally {
+            tryClosingResources();
+        }
+    }
+
+    private void tryClosingResources() throws IOException {
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (SQLException e) {
+                LOGGER.error("Error while closing connection in the PhoenixIndexMapper class ", e);
+            }
+        }
+        if (this.writer != null) {
+            this.writer.close();
+        }
+    }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
index f7574d3..40cb74f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.mapreduce.transform;
 
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -28,9 +29,13 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -42,6 +47,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
 import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
@@ -60,6 +67,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.transform.SystemTransformRecord;
 import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
@@ -67,10 +75,12 @@ import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,8 +94,11 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.createIndexToolTables;
 import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
 import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
+import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
 import static org.apache.phoenix.util.QueryUtil.getConnection;
 
 public class TransformTool extends Configured implements Tool {
@@ -105,6 +118,13 @@ public class TransformTool extends Configured implements Tool {
     private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true,
             "Index table name(not required in case of partial rebuilding)");
 
+    private static final Option FIX_UNVERIFIED_TRANSFORM_OPTION = new Option("fu", "fix-unverified", false,
+            "To fix unverified transform records");
+
+    private static final Option USE_NEW_TABLE_AS_SOURCE_OPTION =
+            new Option("fn", "from-new", false,
+                    "To verify every row in the new table has a corresponding row in the old table. ");
+
     private static final Option PARTIAL_TRANSFORM_OPTION = new Option("pt", "partial-transform", false,
             "To transform a data table from a start timestamp");
 
@@ -145,15 +165,24 @@ public class TransformTool extends Configured implements Tool {
     private static final Option END_TIME_OPTION = new Option("et", "end-time",
             true, "End time for transform");
 
-    public static final String TRANSFORM_JOB_NAME_TEMPLATE = "PHOENIX_TRANS_%s.%s";
+    public static final String TRANSFORM_JOB_NAME_TEMPLATE = "PHOENIX_TRANS_%s.%s.%s";
 
     public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial transform accepts "
             + "non-zero ts set in the past as start-time(st) option and that ts must be present in SYSTEM.TRANSFORM table";
 
-    public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views or transactional tables";
+    public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views";
 
     public static final String PARTIAL_TRANSFORM_NOT_COMPATIBLE = "Can't abort/pause/resume/split during partial transform";
 
+    private static final Option VERIFY_OPTION = new Option("v", "verify", true,
+            "To verify every data row in the old table has a corresponding row in the new table. " +
+                    "The accepted values are NONE, ONLY, BEFORE,  AFTER, and BOTH. " +
+                    "NONE is for no inline verification, which is also the default for this option. ONLY is for " +
+                    "verifying without rebuilding the new table rows. The rest for verifying before, after, and both before " +
+                    "and after rebuilding row. If the verification is done before rebuilding rows and the correct " +
+                    "new table rows will not be rebuilt");
+
+
     private Configuration configuration;
     private Connection connection;
     private String tenantId;
@@ -174,10 +203,13 @@ public class TransformTool extends Configured implements Tool {
     private String oldTableWithSchema;
     private String newTableWithSchema;
     private JobPriority jobPriority;
+    private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;;
     private String jobName;
     private boolean isForeground;
     private Long startTime, endTime, lastTransformTime;
     private boolean isPartialTransform;
+    private boolean shouldFixUnverified;
+    private boolean shouldUseNewTableAsSource;
     private Job job;
 
     public Long getStartTime() {
@@ -224,10 +256,13 @@ public class TransformTool extends Configured implements Tool {
         options.addOption(PARTIAL_TRANSFORM_OPTION);
         options.addOption(START_TIME_OPTION);
         options.addOption(END_TIME_OPTION);
+        options.addOption(FIX_UNVERIFIED_TRANSFORM_OPTION);
+        options.addOption(USE_NEW_TABLE_AS_SOURCE_OPTION);
         options.addOption(AUTO_SPLIT_OPTION);
         options.addOption(ABORT_TRANSFORM_OPTION);
         options.addOption(PAUSE_TRANSFORM_OPTION);
         options.addOption(RESUME_TRANSFORM_OPTION);
+        options.addOption(VERIFY_OPTION);
         START_TIME_OPTION.setOptionalArg(true);
         END_TIME_OPTION.setOptionalArg(true);
         return options;
@@ -265,6 +300,8 @@ public class TransformTool extends Configured implements Tool {
     public int populateTransformToolAttributesAndValidate(CommandLine cmdLine) throws Exception {
         boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
         boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
+        shouldFixUnverified = cmdLine.hasOption(FIX_UNVERIFIED_TRANSFORM_OPTION.getOpt());
+        shouldUseNewTableAsSource = cmdLine.hasOption(USE_NEW_TABLE_AS_SOURCE_OPTION.getOpt());
         basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
         isPartialTransform = cmdLine.hasOption(PARTIAL_TRANSFORM_OPTION.getOpt());
         if (useStartTime) {
@@ -279,11 +316,11 @@ public class TransformTool extends Configured implements Tool {
             validateTimeRange(startTime, endTime);
         }
 
-        if (isPartialTransform &&
+        if ((isPartialTransform || shouldFixUnverified) &&
                 (cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt()))) {
             throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
         }
-        if (isPartialTransform &&
+        if ((isPartialTransform || shouldFixUnverified) &&
                 (cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt()) || cmdLine.hasOption(PAUSE_TRANSFORM_OPTION.getOpt())
                         || cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt()))) {
             throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
@@ -337,6 +374,11 @@ public class TransformTool extends Configured implements Tool {
 
         oldTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, SchemaUtil.getTableNameFromFullName(pOldTable.getName().getString()));
         newTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, SchemaUtil.getTableNameFromFullName(pNewTable.getName().getString()));
+        if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
+            String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
+            verifyType = IndexTool.IndexVerifyType.fromValue(value);
+        }
+
         return 0;
     }
 
@@ -350,10 +392,6 @@ public class TransformTool extends Configured implements Tool {
             throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
         }
 
-        if (argPDataTable.isTransactional()) {
-            throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
-        }
-
         if (transformRecord == null){
             throw new IllegalStateException("ALTER statement has not been run and the transform has not been created for this table");
         }
@@ -425,9 +463,23 @@ public class TransformTool extends Configured implements Tool {
     }
 
     public Job configureJob() throws Exception {
-        final String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
+        String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable, (shouldFixUnverified ? "Unverified" : "Full"));
+        if (shouldUseNewTableAsSource) {
+            jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable, "NewTableSource_" + pNewTable.getName());
+        }
+        if (pNewTable.isTransactional()) {
+            configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
+                    Long.toString(TransactionUtil.convertToNanoseconds(pOldTable.getTimeStamp()+1)));
+            configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
+        }
         if (lastTransformTime != null) {
             PhoenixConfigurationUtil.setCurrentScnValue(configuration, lastTransformTime);
+        } else {
+            if (endTime != null) {
+                PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+            } else {
+                setCurrentScnValue(configuration, EnvironmentEdgeManager.currentTimeMillis());
+            }
         }
 
         final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
@@ -446,6 +498,8 @@ public class TransformTool extends Configured implements Tool {
             PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
         }
 
+        PhoenixConfigurationUtil.setIndexVerifyType(configuration, verifyType);
+
         long indexRebuildQueryTimeoutMs =
                 configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
                         QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
@@ -471,7 +525,12 @@ public class TransformTool extends Configured implements Tool {
 
         PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, oldTableWithSchema);
         PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, newTableWithSchema);
-        PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+        PhoenixConfigurationUtil.setShouldFixUnverifiedTransform(configuration, shouldFixUnverified);
+        if (shouldFixUnverified || shouldUseNewTableAsSource) {
+            PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
+        } else {
+            PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+        }
         if (startTime != null) {
             PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
         }
@@ -497,16 +556,21 @@ public class TransformTool extends Configured implements Tool {
         if (outputPath != null) {
             FileOutputFormat.setOutputPath(job, outputPath);
         }
-        job.setReducerClass(PhoenixTransformReducer.class);
         job.setNumReduceTasks(1);
         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 
+        if (shouldFixUnverified) {
+            configureUnverifiedFromNewToOld();
+        } else {
+            configureFromOldToNew();
+        }
         //Set the Output classes
         job.setMapOutputValueClass(IntWritable.class);
         job.setOutputKeyClass(NullWritable.class);
         job.setOutputValueClass(NullWritable.class);
         TableMapReduceUtil.addDependencyJars(job);
-        job.setMapperClass(PhoenixServerBuildIndexMapper.class);
+
+        job.setReducerClass(PhoenixTransformReducer.class);
 
         TableMapReduceUtil.initCredentials(job);
         LOGGER.info("TransformTool is running for " + job.getJobName());
@@ -514,6 +578,39 @@ public class TransformTool extends Configured implements Tool {
         return job;
     }
 
+    private void configureFromOldToNew() {
+        job.setMapperClass(PhoenixServerBuildIndexMapper.class);
+    }
+
+    private void configureUnverifiedFromNewToOld() throws IOException, SQLException {
+        List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(1);
+        TransformMaintainer transformMaintainer = pNewTable.getTransformMaintainer(pOldTable, connection.unwrap(PhoenixConnection.class));
+        maintainers.add(transformMaintainer);
+        Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
+        if (startTime != null) {
+            scan.setTimeRange(startTime - 1, HConstants.LATEST_TIMESTAMP);
+        }
+        scan.setRaw(true);
+        scan.setCacheBlocks(false);
+        SingleColumnValueFilter filter = new SingleColumnValueFilter(
+                transformMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                transformMaintainer.getEmptyKeyValueQualifier(),
+                CompareFilter.CompareOp.EQUAL,
+                UNVERIFIED_BYTES
+        );
+        scan.setFilter(filter);
+        Configuration conf = job.getConfiguration();
+        HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+        // Set the Physical Table name for use in DirectHTableWriter#write(Mutation)
+        conf.set(TableOutputFormat.OUTPUT_TABLE,
+                PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration()));
+        ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+        TransformMaintainer.serialize(pDataTable, indexMetaDataPtr, pNewTable, connection.unwrap(PhoenixConnection.class));
+        PhoenixConfigurationUtil.setIndexMaintainers(conf, indexMetaDataPtr);
+        TableMapReduceUtil.initTableMapperJob(pNewTable.getPhysicalName().getString(), scan, PhoenixTransformRepairMapper.class, null,
+                null, job);
+    }
+
     public int runJob() throws IOException {
         try {
             if (isForeground) {
@@ -731,6 +828,7 @@ public class TransformTool extends Configured implements Tool {
             try (Connection conn = getConnection(configuration)) {
                 this.connection = conn;
                 this.connection.setAutoCommit(true);
+                createIndexToolTables(conn);
                 populateTransformToolAttributesAndValidate(cmdLine);
                 if (cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt())) {
                     abortTransform();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 9d7df7e..a610a90 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -137,6 +137,11 @@ public final class PhoenixConfigurationUtil {
 
     public static final boolean DEFAULT_SCRUTINY_OUTPUT_INVALID_ROWS = false;
 
+    public static final String SHOULD_FIX_UNVERIFIED_TRANSFORM =
+            "phoenix.mr.fix.unverified.transform";
+
+    public static final boolean DEFAULT_SHOULD_FIX_UNVERIFIED_TRANSFORM = false;
+
     public static final String SCRUTINY_OUTPUT_FORMAT = "phoenix.mr.scrutiny.output.format";
 
     public static final String SCRUTINY_EXECUTE_TIMESTAMP = "phoenix.mr.scrutiny.execute.timestamp";
@@ -853,6 +858,18 @@ public final class PhoenixConfigurationUtil {
         return IndexTool.IndexVerifyType.fromValue(value);
     }
 
+    public static boolean getShouldFixUnverifiedTransform(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.getBoolean(SHOULD_FIX_UNVERIFIED_TRANSFORM,
+                DEFAULT_SHOULD_FIX_UNVERIFIED_TRANSFORM);
+    }
+
+    public static void setShouldFixUnverifiedTransform(Configuration configuration,
+                                                    boolean shouldFixUnverified) {
+        Preconditions.checkNotNull(configuration);
+        configuration.setBoolean(SHOULD_FIX_UNVERIFIED_TRANSFORM, shouldFixUnverified);
+    }
+
     public static IndexTool.IndexVerifyType getDisableLoggingVerifyType(Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         String value = configuration.get(DISABLE_LOGGING_TYPE, IndexTool.IndexVerifyType.NONE.getValue());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index c6713df..fb2952a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -4809,6 +4809,10 @@ public class MetaDataClient {
                 }
 
                 if (isTransformNeeded) {
+                    if (indexRef.getTable().getViewIndexId() != null) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_VIEW_INDEX)
+                                .setSchemaName(schemaName).setTableName(indexName).build().buildException();
+                    }
                     try {
                         Transform.addTransform(connection, tenantId, table, metaProperties, seqNum, PTable.TransformType.METADATA_TRANSFORM);
                     } catch (SQLException ex) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
index 8e400b1..cb7dc29 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.ArrayList;
 
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
 import static org.apache.phoenix.util.MetaDataUtil.SYNCED_DATA_TABLE_AND_INDEX_COL_FAM_PROPERTIES;
 
@@ -389,6 +390,13 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
                         continue;
                     }
                 }
+
+                if (key.contains("TTL") && definedProps.containsKey(TRANSACTION_PROVIDER)
+                        && definedProps.get(TRANSACTION_PROVIDER).equalsIgnoreCase("OMID")) {
+                    // TTL is unsupported for OMID transactional table
+                    continue;
+                }
+
                 if (optionBuilder.length() != 0) {
                     optionBuilder.append(", ");
                 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index 79e80d3..80d32e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.schema.transform;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -45,6 +46,11 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
+
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.schema.tuple.BaseTuple;
+import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -134,7 +140,7 @@ public class TransformMaintainer extends IndexMaintainer {
         this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
         this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
 
-        this.newTableName = newTable.getPhysicalName().getBytes();
+        this.newTableName = SchemaUtil.getTableName(newTable.getSchemaName(), newTable.getTableName()).getBytes();
         boolean newTableWALDisabled = newTable.isWALDisabled();
         int nNewTableColumns = newTable.getColumns().size();
         int nNewTablePKColumns = newTable.getPKColumns().size();
@@ -185,6 +191,42 @@ public class TransformMaintainer extends IndexMaintainer {
         }
     }
 
+    /*
+     * Build the old table row key
+     */
+    public byte[] buildDataRowKey(ImmutableBytesWritable indexRowKeyPtr, byte[][] viewConstants) {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedNewTableRowKeyBytes);
+        DataOutput output = new DataOutputStream(stream);
+
+        try {
+            int dataPosOffset = 0;
+            int maxRowKeyOffset = indexRowKeyPtr.getLength();
+
+            oldTableRowKeySchema.iterator(indexRowKeyPtr, ptr, 0);
+            // The oldTableRowKeySchema includes the salt byte field,
+            while (oldTableRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset) != null) {
+                output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
+                if (!oldTableRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
+                    output.writeByte(SchemaUtil.getSeparatorByte(oldTableRowKeySchema.rowKeyOrderOptimizable(), ptr.getLength()==0
+                            , oldTableRowKeySchema.getField(dataPosOffset)));
+                }
+                dataPosOffset++;
+            }
+
+            byte[] oldTableRowKey = stream.getBuffer();
+            return oldTableRowKey;
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+        }
+    }
+
     /**
      * Init calculated state reading/creating
      */
@@ -232,6 +274,11 @@ public class TransformMaintainer extends IndexMaintainer {
         ptr.set(stream.toByteArray(), 0, stream.size());
     }
 
+    @Override
+    public Iterator<ColumnReference> iterator() {
+        return newTableColumns.iterator();
+    }
+
     public static ServerCachingProtos.TransformMaintainer toProto(TransformMaintainer maintainer) throws IOException {
         ServerCachingProtos.TransformMaintainer.Builder builder = ServerCachingProtos.TransformMaintainer.newBuilder();
         builder.setSaltBuckets(maintainer.nNewTableSaltBuckets);
@@ -470,4 +517,11 @@ public class TransformMaintainer extends IndexMaintainer {
         return newTableEmptyKeyValueRef.getQualifier();
     }
 
+    public byte[] getDataEmptyKeyValueCF() {
+        return oldTableEmptyKeyValueCF;
+    }
+
+    public byte[] getEmptyKeyValueQualifierForDataTable() {
+        return oldTableEmptyKeyValueRef.getQualifier();
+    }
 }
\ No newline at end of file