You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by go...@apache.org on 2022/01/05 01:47:42 UTC

[phoenix] branch 4.x updated: PHOENIX-6617 IndexRegionObserver changes for creating mutations for transforming table (#701)

This is an automated email from the ASF dual-hosted git repository.

gokcen pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new a6c5f71  PHOENIX-6617 IndexRegionObserver changes for creating mutations for transforming table (#701)
a6c5f71 is described below

commit a6c5f711b2b89547c2b4bb280c47e3d522b10362
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Thu Aug 26 14:34:47 2021 -0700

    PHOENIX-6617 IndexRegionObserver changes for creating mutations for transforming table (#701)
    
    Signed-off-by: Gokcen Iskender <go...@gmail.com>
---
 .../apache/phoenix/end2end/AppendOnlySchemaIT.java |   2 +-
 .../phoenix/end2end/LogicalTableNameBaseIT.java    |   2 +-
 .../apache/phoenix/end2end/TransformToolIT.java    |  67 ++++++++
 .../apache/phoenix/end2end/index/TransformIT.java  | 180 +++++++++++++++++++++
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  30 +++-
 .../UngroupedAggregateRegionScanner.java           |   1 +
 .../org/apache/phoenix/execute/MutationState.java  |   1 +
 .../phoenix/hbase/index/IndexRegionObserver.java   |  14 +-
 .../org/apache/phoenix/index/IndexMaintainer.java  |  23 ++-
 .../phoenix/index/IndexMetaDataCacheClient.java    |   4 +
 .../apache/phoenix/index/PhoenixIndexCodec.java    |  11 +-
 .../apache/phoenix/index/PhoenixIndexMetaData.java |   5 +-
 .../phoenix/jdbc/PhoenixPreparedStatement.java     |   2 +-
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  14 +-
 .../phoenix/mapreduce/transform/TransformTool.java |   2 -
 .../phoenix/query/ConnectionQueryServices.java     |   1 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |   6 +-
 .../query/ConnectionlessQueryServicesImpl.java     |   1 +
 .../query/DelegateConnectionQueryServices.java     |   3 +-
 .../org/apache/phoenix/schema/DelegateTable.java   |   3 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |  21 ++-
 .../java/org/apache/phoenix/schema/PTable.java     |  12 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |  33 +++-
 .../apache/phoenix/schema/transform/Transform.java |  93 +++++++++--
 .../schema/transform/TransformMaintainer.java      |  20 +++
 .../java/org/apache/phoenix/util/ViewUtil.java     |  29 +++-
 .../src/main/protobuf/MetaDataService.proto        |   1 +
 phoenix-core/src/main/protobuf/PTable.proto        |   1 +
 28 files changed, 540 insertions(+), 42 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java
index a865121..6eb05bb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AppendOnlySchemaIT.java
@@ -139,7 +139,7 @@ public class AppendOnlySchemaIT extends ParallelStatsDisabledIT {
             // if not verify exists is true one call to add column table with empty mutation list (which does not make a rpc) 
             // else verify no add column calls
             verify(connectionQueryServices, notExists ? times(1) : never() )
-                    .addColumn(eq(Collections.<Mutation>emptyList()), any(PTable.class),
+                    .addColumn(eq(Collections.<Mutation>emptyList()), any(PTable.class), any(PTable.class),
                             any(PTable.class), anyMap(), anySetOf(String.class),
                             anyListOf(PColumn.class));
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
index 379ab58..72037f2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
@@ -34,13 +34,13 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
index 276f08b..5002c8c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.transform.TransformTool;
 import org.apache.phoenix.query.QueryServices;
@@ -60,6 +61,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TransformToolIT extends ParallelStatsDisabledIT{
     private static final Logger LOGGER = LoggerFactory.getLogger(TransformToolIT.class);
@@ -406,6 +408,71 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
         }
     }
 
+    @Test
+    public void testTransformMutationFailureRepair() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(true);
+            int numOfRows = 0;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            try {
+                IndexToolIT.upsertRow(stmt1, 1);
+                fail("Transform table upsert should have failed");
+            } catch (Exception e) {
+            }
+            assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(0, getRowCount(conn,dataTableFullName));
+
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            IndexToolIT.upsertRow(stmt1, 2);
+
+            assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(1, getRowCount(conn,dataTableFullName));
+
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+            try {
+                IndexToolIT.upsertRow(stmt1, 3);
+                fail("Data table upsert should have failed");
+            } catch (Exception e) {
+            }
+            assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(1, getRowCount(conn,dataTableFullName));
+
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+
+            List<String> args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false);
+            runTransformTool(args.toArray(new String[0]), 0);
+
+            //TODO: Implement GlobalIndexChecker like repair for unverified rows
+            assertEquals(1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
+                    .getTable(Bytes.toBytes(dataTableFullName)), false));
+//            assertEquals(1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
+//                    .getTable(Bytes.toBytes(record.getNewPhysicalTableName())), false));
+        } finally {
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+        }
+    }
+
     public static List<String> getArgList(String schemaName, String dataTable, String indxTable, String tenantId,
                                           Long startTime, Long endTime,
                                           boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
index 5fc8b7a..136da65 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
@@ -20,10 +20,14 @@ package org.apache.phoenix.end2end.index;
 import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,8 +40,10 @@ import java.util.Properties;
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
 import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -163,6 +169,180 @@ public class TransformIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testTransformForLiveMutations_mutatingTable() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            String schema = "S_" + generateUniqueName();
+            String tableName = "TBL_" + generateUniqueName();
+            String idxName = "IND_" + generateUniqueName();
+            String fullTableName = SchemaUtil.getTableName(schema, tableName);
+            String fullIdxName = SchemaUtil.getTableName(schema, idxName);
+
+            String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+            conn.createStatement().execute(createTableSql);
+
+            String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
+            conn.createStatement().execute(String.format(upsertStmt, "a", 1, "val1", 1));
+
+            // Note that index will not be built, since we create it with ASYNC
+            String createIndexSql = "CREATE INDEX " + idxName + " ON " + fullTableName + " (PK1, INT_PK) include (V1) ASYNC";
+            conn.createStatement().execute(createIndexSql);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullIdxName);
+
+            String idxName2 = "IND2_" + generateUniqueName();
+            String fullIdxName2 = SchemaUtil.getTableName(schema, idxName2);
+            conn.createStatement().execute("CREATE INDEX " + idxName2 + " ON " + fullTableName + " (V1) include (V2) ASYNC");
+
+            // Now do a transform and check still the index table is empty since we didn't build it
+            conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+                    + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+            assertEquals(0, getRowCount(conn, fullIdxName));
+            assertEquals(0, getRowCount(conn, fullIdxName2));
+
+            // Now these live mutations should go into the index tables and the transforming table
+            conn.createStatement().execute(String.format(upsertStmt, "b", 2, "val2", 2));
+            conn.commit();
+            conn.createStatement().execute(String.format(upsertStmt, "c", 3, "val3", 3));
+            assertEquals(2, getRowCount(conn, fullIdxName));
+            assertEquals(2, getRowCount(conn, fullIdxName2));
+            assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+
+            // Delete one mutation
+            conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE PK1='b'");
+            assertEquals(1, getRowCount(conn, fullIdxName));
+            assertEquals(1, getRowCount(conn, fullIdxName2));
+            assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+        }
+    }
+
+    @Test
+    public void testTransformForLiveMutations_mutatingBaseTableNoIndex() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            String schema = "S_" + generateUniqueName();
+            String tableName = "TBL_" + generateUniqueName();
+            String fullTableName = SchemaUtil.getTableName(schema, tableName);
+
+            String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+            conn.createStatement().execute(createTableSql);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+
+            String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
+            conn.createStatement().execute(String.format(upsertStmt, "a", 1, "val1", 1));
+
+            conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+                    + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            conn.createStatement().execute(String.format(upsertStmt, "b", 2, "val2", 2));
+            conn.commit();
+            conn.createStatement().execute(String.format(upsertStmt, "c", 3, "val3", 3));
+            assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+
+            // Delete one mutation
+            conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE PK1='b'");
+            assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+        }
+    }
+
+    @Test
+    public void testTransformForLiveMutations_mutatingIndex() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            String schema = "S_" + generateUniqueName();
+            String tableName = "TBL_" + generateUniqueName();
+            String idxName = "IND_" + generateUniqueName();
+            String fullTableName = SchemaUtil.getTableName(schema, tableName);
+            String fullIdxName = SchemaUtil.getTableName(schema, idxName);
+
+            String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+            conn.createStatement().execute(createTableSql);
+
+            // Note that index will not be built, since we create it with ASYNC
+            String createIndexSql = "CREATE INDEX " + idxName + " ON " + fullTableName + " (PK1, INT_PK) include (V1) ASYNC";
+            conn.createStatement().execute(createIndexSql);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullIdxName);
+
+            conn.createStatement().execute("ALTER INDEX " + idxName + " ON " + fullTableName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schema, idxName, fullTableName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            String upsertStmt = "UPSERT INTO " + fullTableName + " (PK1, INT_PK, V1, V2) VALUES ('%s', %d, '%s', %d)";
+            // Now these live mutations should go into the index tables and the transforming table
+            conn.createStatement().execute(String.format(upsertStmt, "b", 2, "val2", 2));
+            conn.createStatement().execute(String.format(upsertStmt, "c", 3, "val3", 3));
+            assertEquals(2, getRowCount(conn, fullIdxName));
+            assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+
+            // Delete one mutation
+            conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE PK1='b'");
+            assertEquals(1, getRowCount(conn, fullIdxName));
+            assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+        }
+
+    }
+
+    @Test
+    public void testTransformForLiveMutations_mutatingBaseTableForView() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            String schema = "S_" + generateUniqueName();
+            String tableName = "TBL_" + generateUniqueName();
+            String fullTableName = SchemaUtil.getTableName(schema, tableName);
+            String parentViewName = "VWP_" + generateUniqueName();
+            String viewName = "VW_" + generateUniqueName();
+            String viewIdxName = "VWIDX_" + generateUniqueName();
+
+            String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+                    "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+            conn.createStatement().execute(createTableSql);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+
+            String createParentViewSql = "CREATE VIEW " + parentViewName + " ( PARENT_VIEW_COL1 VARCHAR ) AS SELECT * FROM " + fullTableName;
+            conn.createStatement().execute(createParentViewSql);
+
+            String createViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + parentViewName;
+            conn.createStatement().execute(createViewSql);
+
+            String createViewIdxSql = "CREATE INDEX " + viewIdxName + " ON " + viewName + " (VIEW_COL1) include (VIEW_COL2) ";
+            conn.createStatement().execute(createViewIdxSql);
+
+            String upsertStmt = "UPSERT INTO " + viewName + " (PK1, INT_PK, V1, VIEW_COL1, VIEW_COL2) VALUES ('%s', %d, '%s', %d, '%s')";
+            conn.createStatement().execute(String.format(upsertStmt, "a", 1, "val1", 1, "col2_1"));
+
+            conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET "
+                    + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schema, tableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            conn.createStatement().execute(String.format(upsertStmt, "b", 2, "val2", 2, "col2_2"));
+            conn.createStatement().execute(String.format(upsertStmt, "c", 3, "val3", 3, "col2_3"));
+            assertEquals(3, getRowCount(conn, viewName));
+            assertEquals(3, getRowCount(conn, viewIdxName));
+            // New table has 1 less since it is not aware of the previous record since we didn't run the TransformTool
+            assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+
+            conn.createStatement().execute("DELETE FROM " + viewName + " WHERE VIEW_COL1=2");
+            assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(2, getRowCount(conn, viewName));
+            assertEquals(2, getRowCount(conn, viewIdxName));
+        }
+    }
+
     private void assertSystemTransform(Connection conn, int rowCount, String schema, String logicalTableName, String tenantId) throws SQLException {
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM "+
                 PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 37c6fa8..da5c223 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -82,8 +82,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
 import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.PTableType.INDEX;
-import static org.apache.phoenix.schema.PTableType.VIEW;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
@@ -93,7 +93,6 @@ import static org.apache.phoenix.util.ViewUtil.getSystemTableForChildLinks;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
-import java.sql.Connection;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -108,6 +107,7 @@ import java.util.NavigableMap;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.directory.api.util.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -227,6 +227,8 @@ import org.apache.phoenix.schema.metrics.MetricsMetadataSource;
 import org.apache.phoenix.schema.metrics.MetricsMetadataSourceFactory;
 import org.apache.phoenix.schema.task.SystemTaskParams;
 import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
@@ -1415,6 +1417,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         builder.setTimeStamp(timeStamp);
 
 
+        PTable transformingNewTable = null;
         boolean isRegularView = (tableType == PTableType.VIEW && viewType != ViewType.MAPPED);
         for (List<Cell> columnCellList : allColumnCellList) {
 
@@ -1487,6 +1490,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 } else if (linkType == LinkType.EXCLUDED_COLUMN) {
                     // add the excludedColumn
                     addExcludedColumnToTable(columns, colName, famName, colKv.getTimestamp());
+                } else if (linkType == LinkType.TRANSFORMING_NEW_TABLE) {
+                    transformingNewTable = doGetTable((tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes())
+                            , SchemaUtil.getSchemaNameFromFullName(famName.getBytes()).getBytes(), SchemaUtil.getTableNameFromFullName(famName.getBytes()).getBytes(), clientTimeStamp, null, clientVersion);
+                    if (transformingNewTable == null) {
+                        // It could be global
+                        transformingNewTable = doGetTable(ByteUtil.EMPTY_BYTE_ARRAY
+                                , SchemaUtil.getSchemaNameFromFullName(famName.getBytes()).getBytes(), SchemaUtil.getTableNameFromFullName(famName.getBytes()).getBytes(), clientTimeStamp, null, clientVersion);
+                        if (transformingNewTable == null) {
+                            ServerUtil.throwIOException("Transforming new table not found", new TableNotFoundException(schemaName.getString(), famName.getString()));
+                        }
+                    }
                 }
             } else {
                 long columnTimestamp =
@@ -1510,6 +1524,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         if (!setPhysicalName && oldTable != null) {
             builder.setPhysicalTableName(oldTable.getPhysicalName());
         }
+        builder.setTransformingNewTable(transformingNewTable);
 
         builder.setExcludedColumns(ImmutableList.<PColumn>of());
         builder.setBaseTableLogicalName(parentLogicalName != null ?
@@ -3043,7 +3058,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     private MetaDataMutationResult mutateColumn(
             final List<Mutation> tableMetadata,
             final ColumnMutator mutator, final int clientVersion,
-            final PTable parentTable, boolean isAddingOrDroppingColumns) throws IOException {
+            final PTable parentTable, final PTable transformingNewTable, boolean isAddingOrDroppingColumns) throws IOException {
         byte[][] rowKeyMetaData = new byte[5][];
         MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
         byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -3124,6 +3139,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                                 parentTable);
                     }
                 }
+                if (transformingNewTable !=null) {
+                    table = PTableImpl.builderWithColumns(table, getColumnsToClone(table))
+                            .setTransformingNewTable(transformingNewTable).build();
+                }
 
                 if (table.getTimeStamp() >= clientTimeStamp) {
                     LOGGER.info("Found newer table as of " + table.getTimeStamp()
@@ -3417,9 +3436,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         try {
             List<Mutation> tableMetaData = ProtobufUtil.getMutations(request);
             PTable parentTable = request.hasParentTable() ? PTableImpl.createFromProto(request.getParentTable()) : null;
+            PTable transformingNewTable = request.hasTransformingNewTable() ? PTableImpl.createFromProto(request.getTransformingNewTable()) : null;
             boolean addingColumns = request.getAddingColumns();
             MetaDataMutationResult result = mutateColumn(tableMetaData, new AddColumnMutator(),
-                    request.getClientVersion(), parentTable, addingColumns);
+                    request.getClientVersion(), parentTable, transformingNewTable, addingColumns);
             if (result != null) {
                 done.run(MetaDataMutationResult.toProto(result));
             }
@@ -3557,7 +3577,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             tableMetaData = ProtobufUtil.getMutations(request);
             PTable parentTable = request.hasParentTable() ? PTableImpl.createFromProto(request.getParentTable()) : null;
             MetaDataMutationResult result = mutateColumn(tableMetaData, new DropColumnMutator(env.getConfiguration()),
-                    request.getClientVersion(), parentTable, true);
+                    request.getClientVersion(), parentTable,null, true);
             if (result != null) {
                 done.run(MetaDataMutationResult.toProto(result));
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 4f4ddba..d2d8b16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -211,6 +211,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
         }
         indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
         indexMutations = localIndexBytes == null ? new UngroupedAggregateRegionObserver.MutationList() : new UngroupedAggregateRegionObserver.MutationList(1024);
+        byte[] transforming = scan.getAttribute(BaseScannerRegionObserver.DO_TRANSFORMING);
 
         replayMutations = scan.getAttribute(REPLAY_WRITES);
         indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 4f67764..2289637 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1197,6 +1197,7 @@ public class MutationState implements SQLCloseable {
     private void sendBatch(Map<TableRef, MultiRowMutationState> commitBatch, long[] serverTimeStamps, boolean sendAll) throws SQLException {
         int i = 0;
         Map<TableInfo, List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap();
+
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
             Span span = trace.getSpan();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 90e267a..ad36f41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -91,6 +91,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
@@ -1037,6 +1038,15 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
         return false;
     }
 
+    private static boolean isTransforming(PhoenixIndexMetaData indexMetaData) {
+        for (IndexMaintainer indexMaintainer : indexMetaData.getIndexMaintainers()) {
+            if (indexMaintainer instanceof TransformMaintainer) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context)
             throws Throwable {
         boolean done;
@@ -1105,7 +1115,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
         boolean hasAtomic = hasAtomicUpdate(miniBatchOp);
         long onDupCheckTime = 0;
 
-        if (hasAtomic || hasGlobalIndex(indexMetaData)) {
+        if (hasAtomic || hasGlobalIndex(indexMetaData) || isTransforming(indexMetaData)) {
             // Retrieve the current row states from the data table while holding the lock.
             // This is needed for both atomic mutations and global indexes
             long start = EnvironmentEdgeManager.currentTimeMillis();
@@ -1136,7 +1146,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
         setTimestamps(miniBatchOp, builder, now);
 
         TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
-        if (hasGlobalIndex(indexMetaData)) {
+        if (hasGlobalIndex(indexMetaData) || isTransforming(indexMetaData)) {
             // Prepare next data rows states for pending mutations (for global indexes)
             prepareDataRowStates(c, miniBatchOp, context, now);
             // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 1ba7651..9308b1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
@@ -99,6 +100,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.tuple.BaseTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
@@ -224,11 +226,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      */
     public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
             List<PTable> indexes, PhoenixConnection connection) {
-        if (indexes.isEmpty()) {
+        if (indexes.isEmpty() && dataTable.getTransformingNewTable() == null) {
             ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
             return;
         }
         int nIndexes = indexes.size();
+        if (dataTable.getTransformingNewTable() != null) {
+            nIndexes++;
+        }
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         DataOutputStream output = new DataOutputStream(stream);
         try {
@@ -242,6 +247,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                     WritableUtils.writeVInt(output, protoBytes.length);
                     output.write(protoBytes);
             }
+            if (dataTable.getTransformingNewTable() != null) {
+                ServerCachingProtos.TransformMaintainer proto = TransformMaintainer.toProto(
+                        dataTable.getTransformingNewTable().getTransformMaintainer(dataTable, connection));
+                byte[] protoBytes = proto.toByteArray();
+                WritableUtils.writeVInt(output, protoBytes.length);
+                output.write(protoBytes);
+            }
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }
@@ -316,8 +328,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                       int protoSize = WritableUtils.readVInt(input);
                       byte[] b = new byte[protoSize];
                       input.readFully(b);
-                      org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
-                      maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+                      try {
+                          org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
+                          maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+                      } catch (InvalidProtocolBufferException e) {
+                          org.apache.phoenix.coprocessor.generated.ServerCachingProtos.TransformMaintainer proto = ServerCachingProtos.TransformMaintainer.parseFrom(b);
+                          maintainers.add(TransformMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+                      }
                     } else {
                         IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted);
                         maintainer.readFields(input);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index 1f6dd73..be5d8f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.index;
 
 import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.sql.SQLException;
 import java.util.List;
@@ -134,6 +135,9 @@ public class IndexMetaDataCacheClient {
                 mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
             }
             mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+            if (table.getTransformingNewTable() != null) {
+                mutation.setAttribute(BaseScannerRegionObserver.DO_TRANSFORMING, TRUE_BYTES);
+            }
             if (attribValue != null) {
                 mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
                 mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 9630a0f..abf3acc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -34,6 +34,8 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DO_TRANSFORMING;
+
 /**
  * Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup an index (
  * {@link #getIndexDeletes(TableState, IndexMetaData, byte[], byte[])}) as well as what the new index state should be (
@@ -69,6 +71,13 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         return true;
     }
 
+    boolean isTransforming(Map<String, byte[]> attributes) {
+        if (attributes == null) { return false; }
+        byte[] transforming = attributes.get(DO_TRANSFORMING);
+        if (transforming == null) { return false; }
+        return true;
+    }
+
     @Override
     public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
         PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
@@ -122,6 +131,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
 
     @Override
     public boolean isEnabled(Mutation m) {
-        return hasIndexMaintainers(m.getAttributesMap());
+        return hasIndexMaintainers(m.getAttributesMap()) || isTransforming(m.getAttributesMap());
     }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 75ce9f4..4385894 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 
 public class PhoenixIndexMetaData implements IndexMetaData {
@@ -52,7 +53,9 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         boolean hasLocalIndexes = false;
         for (IndexMaintainer maintainer : indexMetaDataCache.getIndexMaintainers()) {
             isImmutable &= maintainer.isImmutableRows();
-            hasNonPkColumns |= !maintainer.getIndexedColumns().isEmpty();
+            if (!(maintainer instanceof TransformMaintainer)) {
+                hasNonPkColumns |= !maintainer.getIndexedColumns().isEmpty();
+            }
             hasLocalIndexes |= maintainer.isLocalIndex();
         }
         this.isImmutable = isImmutable;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index 83d8d85..2a13121 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -184,7 +184,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
         if (statement.getOperation().isMutation()) {
             throw new ExecuteQueryNotApplicableException(statement.getOperation());
         }
-        
+
         return executeQuery(statement,createQueryLogger(statement,query));
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 7422bbd..fddd608 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -312,10 +312,16 @@ public class PhoenixStatement implements Statement, SQLCloseable {
     
     protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final QueryLogger queryLogger)
             throws SQLException {
-        return executeQuery(stmt, true, queryLogger);
+        return executeQuery(stmt, true, queryLogger, false);
     }
+
+    protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final QueryLogger queryLogger, boolean noCommit)
+            throws SQLException {
+        return executeQuery(stmt, true, queryLogger, noCommit);
+    }
+
     private PhoenixResultSet executeQuery(final CompilableStatement stmt,
-        final boolean doRetryOnMetaNotFoundError, final QueryLogger queryLogger) throws SQLException {
+                                          final boolean doRetryOnMetaNotFoundError, final QueryLogger queryLogger, final boolean noCommit) throws SQLException {
         GLOBAL_SELECT_SQL_COUNTER.increment();
 
         try {
@@ -387,7 +393,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                                 setLastUpdateCount(NO_UPDATE);
                                 setLastUpdateOperation(stmt.getOperation());
                                 // If transactional, this will move the read pointer forward
-                                if (connection.getAutoCommit()) {
+                                if (connection.getAutoCommit() && !noCommit) {
                                     connection.commit();
                                 }
                                 connection.incrementStatementExecutionCounter();
@@ -404,7 +410,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                                                     e.getSchemaName(), e.getTableName(), true)
                                             .wasUpdated()) {
                                         //TODO we can log retry count and error for debugging in LOG table
-                                        return executeQuery(stmt, false, queryLogger);
+                                        return executeQuery(stmt, false, queryLogger, noCommit);
                                     }
                                 }
                                 throw e;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
index 5e5cdcf..f7574d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -435,12 +435,10 @@ public class TransformTool extends Configured implements Tool {
                 new PostIndexDDLCompiler(pConnection, new TableRef(pOldTable), true);
         ddlCompiler.compile(pNewTable);
         final List<String> newColumns = ddlCompiler.getDataColumnNames();
-        //final String selectQuery = ddlCompiler.getSelectQuery();
         final String upsertQuery =
                 QueryUtil.constructUpsertStatement(newTableWithSchema, newColumns, HintNode.Hint.NO_INDEX);
 
         configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
-        //PhoenixConfigurationUtil.setPhysicalTableName(configuration, pNewTable.getPhysicalName().getString());
 
         PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
                 ddlCompiler.getIndexColumnNames().toArray(new String[ddlCompiler.getIndexColumnNames().size()]));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 1714cfe..26a2114 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -120,6 +120,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     public MetaDataMutationResult addColumn(List<Mutation> tableMetaData,
                                             PTable table,
                                             PTable parentTable,
+                                            PTable transformingNewTable,
                                             Map<String, List<Pair<String, Object>>> properties,
                                             Set<String> colFamiliesForPColumnsToBeAdded,
                                             List<PColumn> columns) throws SQLException;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 42bbf4e..932b676 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2324,6 +2324,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public MetaDataMutationResult addColumn(final List<Mutation> tableMetaData,
                                             PTable table,
                                             final PTable parentTable,
+                                            final PTable transformingNewTable,
                                             Map<String, List<Pair<String, Object>>> stmtProperties,
                                             Set<String> colFamiliesForPColumnsToBeAdded,
                                             List<PColumn> columns) throws SQLException {
@@ -2410,6 +2411,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                     if (parentTable!=null)
                         builder.setParentTable(PTableImpl.toProto(parentTable));
+                    if (transformingNewTable!=null) {
+                        builder.setTransformingNewTable(PTableImpl.toProto(transformingNewTable));
+                    }
                     builder.setAddingColumns(addingColumns);
                     instance.addColumn(controller, builder.build(), rpcCallback);
                     if (controller.getFailedOn() != null) {
@@ -4449,7 +4453,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
         tableMetadata.addAll(metaConnection.getMutationState().toMutations(metaConnection.getSCN()).next().getSecond());
         metaConnection.rollback();
-        metaConnection.getQueryServices().addColumn(tableMetadata, sysCatalogPTable, null, Collections.<String,List<Pair<String,Object>>>emptyMap(), Collections.<String>emptySet(), Lists.newArrayList(column));
+        metaConnection.getQueryServices().addColumn(tableMetadata, sysCatalogPTable, null,null, Collections.<String,List<Pair<String,Object>>>emptyMap(), Collections.<String>emptySet(), Lists.newArrayList(column));
         metaConnection.removeTable(null, SYSTEM_CATALOG_NAME, null, timestamp);
         ConnectionQueryServicesImpl.this.removeTable(null,
                 SYSTEM_CATALOG_NAME, null,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 7667837..bef0bfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -332,6 +332,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     public MetaDataMutationResult addColumn(List<Mutation> tableMetaData,
                                             PTable table,
                                             PTable parentTable,
+                                            PTable transformingNewTable,
                                             Map<String, List<Pair<String, Object>>> properties,
                                             Set<String> colFamiliesForPColumnsToBeAdded,
                                             List<PColumn> columnsToBeAdded) throws SQLException {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index d271547..f672aa6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -136,10 +136,11 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public MetaDataMutationResult addColumn(List<Mutation> tableMetaData,
                                             PTable table,
                                             PTable parentTable,
+                                            PTable transformingNewTable,
                                             Map<String, List<Pair<String, Object>>> properties,
                                             Set<String> colFamiliesForPColumnsToBeAdded,
                                             List<PColumn> columns) throws SQLException {
-        return getDelegate().addColumn(tableMetaData, table, parentTable,
+        return getDelegate().addColumn(tableMetaData, table, parentTable, transformingNewTable,
                 properties, colFamiliesForPColumnsToBeAdded, columns);
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 292a0ba..38bd4e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -143,6 +143,9 @@ public class DelegateTable implements PTable {
     public List<PTable> getIndexes() { return delegate.getIndexes(); }
 
     @Override
+    public PTable getTransformingNewTable() { return delegate.getTransformingNewTable(); }
+
+    @Override
     public PIndexState getIndexState() {
         return delegate.getIndexState();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 732c718..c6713df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -116,6 +116,7 @@ import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_P
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -348,7 +349,7 @@ public class MetaDataClient {
     private static final String CREATE_SCHEMA = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
             + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
 
-    private static final String CREATE_LINK =
+    public static final String CREATE_LINK =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
                     TENANT_ID + "," +
                     TABLE_SCHEM + "," +
@@ -3152,6 +3153,11 @@ public class MetaDataClient {
         QualifierEncodingScheme encodingScheme = null;
         Byte encodingSchemeSerializedByte = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
         if (encodingSchemeSerializedByte == null) {
+            if (tableProps.containsKey(ENCODING_SCHEME)) {
+                encodingSchemeSerializedByte = QualifierEncodingScheme.valueOf(((String) tableProps.get(ENCODING_SCHEME))).getSerializedMetadataValue();
+            }
+        }
+        if (encodingSchemeSerializedByte == null) {
             // Ignore default if transactional and column encoding is not supported (as with OMID)
             if (transactionProvider == null || !transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.COLUMN_ENCODING) ) {
                 encodingSchemeSerializedByte = (byte)connection.getQueryServices().getProps().getInt(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB,
@@ -4052,9 +4058,10 @@ public class MetaDataClient {
                     connection.rollback();
                 }
 
-               if (isTransformNeeded) {
+                PTable transformingNewTable = null;
+                if (isTransformNeeded) {
                    try {
-                       Transform.addTransform(connection, tenantIdToUse, table, metaProperties, seqNum, PTable.TransformType.METADATA_TRANSFORM);
+                       transformingNewTable = Transform.addTransform(connection, tenantIdToUse, table, metaProperties, seqNum, PTable.TransformType.METADATA_TRANSFORM);
                     } catch (SQLException ex) {
                        connection.rollback();
                        throw ex;
@@ -4137,7 +4144,7 @@ public class MetaDataClient {
                     acquiredColumnMutexSet.add(pColumn.toString());
                 }
                 MetaDataMutationResult result = connection.getQueryServices().addColumn(tableMetaData, table,
-                        getParentTable(table), properties, colFamiliesForPColumnsToBeAdded, columns);
+                        getParentTable(table), transformingNewTable, properties, colFamiliesForPColumnsToBeAdded, columns);
 
                 try {
                     MutationCode code = processMutationResult(schemaName, tableName, result);
@@ -4206,6 +4213,10 @@ public class MetaDataClient {
                             connection.getQueryServices().updateData(plan);
                         }
                     }
+                    if (transformingNewTable != null) {
+                        connection.removeTable(tenantId, fullTableName, null, resolvedTimeStamp);
+                        connection.getQueryServices().clearCache();
+                    }
                     if (emptyCF != null) {
                         Long scn = connection.getSCN();
                         connection.setAutoCommit(true);
@@ -4556,7 +4567,7 @@ public class MetaDataClient {
                                     Collections.<Mutation>singletonList(new Put(SchemaUtil.getTableKey
                                             (tenantIdBytes, tableContainingColumnToDrop.getSchemaName().getBytes(),
                                                     tableContainingColumnToDrop.getTableName().getBytes()))),
-                                                    tableContainingColumnToDrop, null, family, Sets.newHashSet(Bytes.toString(emptyCF)), Collections.<PColumn>emptyList());
+                                                    tableContainingColumnToDrop, null, null,family, Sets.newHashSet(Bytes.toString(emptyCF)), Collections.<PColumn>emptyList());
 
                         }
                     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 0eb5d26..437ab61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -163,7 +163,11 @@ public interface PTable extends PMetaDataEntity {
         /**
          * Link from an index on a view to its parent table
          */
-        VIEW_INDEX_PARENT_TABLE((byte)6);
+        VIEW_INDEX_PARENT_TABLE((byte)6),
+        /**
+         * Link from the old table to the new transforming table
+         */
+        TRANSFORMING_NEW_TABLE((byte)7);
 
         private final byte[] byteValue;
         private final byte serializedValue;
@@ -770,6 +774,12 @@ public interface PTable extends PMetaDataEntity {
     List<PTable> getIndexes();
 
     /**
+     * Return the new version of the table if it is going through transform.
+     * @return the new table.
+     */
+    PTable getTransformingNewTable();
+
+    /**
      * For a table of index type, return the state of the table.
      * @return the state of the index.
      */
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 97605d7..a90e046 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -104,7 +104,6 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -116,6 +115,7 @@ import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Objects;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
@@ -168,6 +168,9 @@ public class PTableImpl implements PTable {
     private final RowKeySchema rowKeySchema;
     // Indexes associated with this table.
     private final List<PTable> indexes;
+    // If the table is going through transform, we have this.
+    private final PTable transformingNewTable;
+
     // Data table name that the index is created on.
     private final PName parentName;
     private final PName parentSchemaName;
@@ -231,6 +234,7 @@ public class PTableImpl implements PTable {
         private Integer bucketNum;
         private RowKeySchema rowKeySchema;
         private List<PTable> indexes;
+        private PTable transformingNewTable;
         private PName parentName;
         private PName parentSchemaName;
         private PName parentTableName;
@@ -399,6 +403,11 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        public Builder setTransformingNewTable(PTable transformingNewTable) {
+            this.transformingNewTable = transformingNewTable;
+            return this;
+        }
+
         public Builder setParentName(PName parentName) {
             this.parentName = parentName;
             return this;
@@ -832,6 +841,9 @@ public class PTableImpl implements PTable {
             for (PTable index : this.indexes) {
                 estimatedSize += index.getEstimatedSize();
             }
+            if (transformingNewTable!=null) {
+                estimatedSize += transformingNewTable.getEstimatedSize();
+            }
 
             estimatedSize += PNameFactory.getEstimatedSize(this.parentName);
             for (PName physicalName : this.physicalNames) {
@@ -904,6 +916,7 @@ public class PTableImpl implements PTable {
         this.bucketNum = builder.bucketNum;
         this.rowKeySchema = builder.rowKeySchema;
         this.indexes = builder.indexes;
+        this.transformingNewTable = builder.transformingNewTable;
         this.parentName = builder.parentName;
         this.parentSchemaName = builder.parentSchemaName;
         this.parentTableName = builder.parentTableName;
@@ -1006,6 +1019,7 @@ public class PTableImpl implements PTable {
                 .setBucketNum(table.getBucketNum())
                 .setIndexes(table.getIndexes() == null ?
                         Collections.<PTable>emptyList() : table.getIndexes())
+                .setTransformingNewTable(table.getTransformingNewTable())
                 .setParentSchemaName(table.getParentSchemaName())
                 .setParentTableName(table.getParentTableName())
                 .setBaseTableLogicalName(table.getBaseTableLogicalName())
@@ -1620,6 +1634,11 @@ public class PTableImpl implements PTable {
     }
 
     @Override
+    public PTable getTransformingNewTable() {
+        return transformingNewTable;
+    }
+
+    @Override
     public PIndexState getIndexState() {
         return state;
     }
@@ -1678,7 +1697,7 @@ public class PTableImpl implements PTable {
     public synchronized boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
         if (indexMaintainersPtr == null || indexMaintainersPtr.getLength()==0) {
             indexMaintainersPtr = new ImmutableBytesWritable();
-            if (indexes.isEmpty()) {
+            if (indexes.isEmpty() && transformingNewTable == null) {
                 indexMaintainersPtr.set(ByteUtil.EMPTY_BYTE_ARRAY);
             } else {
                 IndexMaintainer.serialize(this, indexMaintainersPtr, connection);
@@ -1803,6 +1822,11 @@ public class PTableImpl implements PTable {
             indexes.add(createFromProto(curPTableProto));
         }
 
+        PTable transformingNewTable = null;
+        if (table.hasTransformingNewTable()){
+            PTableProtos.PTable curTransformingPTableProto = table.getTransformingNewTable();
+            transformingNewTable = createFromProto(curTransformingPTableProto);
+        }
         boolean isImmutableRows = table.getIsImmutableRows();
         PName parentSchemaName = null;
         PName parentTableName = null;
@@ -1972,6 +1996,7 @@ public class PTableImpl implements PTable {
                     .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
                     .setBucketNum((bucketNum == NO_SALTING) ? null : bucketNum)
                     .setIndexes(indexes == null ? Collections.<PTable>emptyList() : indexes)
+                    .setTransformingNewTable(transformingNewTable)
                     .setParentSchemaName(parentSchemaName)
                     .setParentTableName(parentTableName)
                     .setBaseTableLogicalName(parentLogicalName)
@@ -2041,6 +2066,10 @@ public class PTableImpl implements PTable {
         for (PTable curIndex : indexes) {
             builder.addIndexes(toProto(curIndex));
         }
+        PTable transformingNewTable = table.getTransformingNewTable();
+        if (transformingNewTable != null) {
+            builder.setTransformingNewTable(toProto(transformingNewTable));
+        }
         builder.setIsImmutableRows(table.isImmutableRows());
         // TODO remove this field in 5.0 release
         if (table.getParentName() != null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index 56f0545..3eabc72 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -18,12 +18,15 @@
 package org.apache.phoenix.schema.transform;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TableInfo;
 import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
 import org.apache.phoenix.mapreduce.transform.TransformTool;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -34,12 +37,16 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.JacksonUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TableViewFinderResult;
+import org.apache.phoenix.util.ViewUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,10 +56,13 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.util.Collections;
 
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
+import static org.apache.phoenix.schema.MetaDataClient.CREATE_LINK;
 import static org.apache.phoenix.schema.PTableType.INDEX;
+import static org.apache.phoenix.schema.PTableType.VIEW;
 
 public class Transform {
     private static final Logger LOGGER = LoggerFactory.getLogger(Transform.class);
@@ -63,7 +73,7 @@ public class Transform {
         return newName;
     }
 
-    public static void addTransform(PhoenixConnection connection, String tenantId, PTable table, MetaDataClient.MetaProperties changingProperties,
+    public static PTable addTransform(PhoenixConnection connection, String tenantId, PTable table, MetaDataClient.MetaProperties changingProperties,
                                     long sequenceNum, PTable.TransformType transformType) throws SQLException {
         try {
             String newMetadata = JacksonUtil.getObjectWriter().writeValueAsString(changingProperties);
@@ -87,7 +97,7 @@ public class Transform {
                 newPhysicalTableName = generateNewTableName(schema, logicalTableName, sequenceNum);
             }
             transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
-            Transform.addTransform(table, changingProperties, transformBuilder.build(), connection);
+            return Transform.addTransform(table, changingProperties, transformBuilder.build(), sequenceNum, connection);
         } catch (JsonProcessingException ex) {
             LOGGER.error("addTransform failed", ex);
             throw new SQLException("Adding transform failed with JsonProcessingException");
@@ -101,8 +111,9 @@ public class Transform {
         }
     }
 
-    protected static void addTransform(
-            PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws Exception {
+    protected static PTable addTransform(
+            PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams,
+            long sequenceNum, PhoenixConnection connection) throws Exception {
         PName newTableName = PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
         PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
         PTable newTable = new PTableImpl.Builder()
@@ -122,6 +133,7 @@ public class Transform {
                 .setImmutableRows(table.isImmutableRows())
                 .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
                 .setIndexType(table.getIndexType())
+                .setIndexes(Collections.<PTable>emptyList())
                 .setName(newTableName)
                 .setMultiTenant(table.isMultiTenant())
                 .setParentName(table.getParentName())
@@ -150,16 +162,77 @@ public class Transform {
                         (changedProps.getColumnEncodedBytesProp() != null? changedProps.getColumnEncodedBytesProp() : table.getEncodingScheme()))
                 .build();
         SchemaExtractionProcessor schemaExtractionProcessor = new SchemaExtractionProcessor(systemTransformParams.getTenantId(),
-                connection.getQueryServices().getConfiguration(), newTable,  true);
+                connection.getQueryServices().getConfiguration(), newTable, true);
         String ddl = schemaExtractionProcessor.process();
         LOGGER.info("Creating transforming table via " + ddl);
         connection.createStatement().execute(ddl);
         upsertTransform(systemTransformParams, connection);
+
+        // Add row linking from old table row to new table row
+        addTransformTableLink(connection, systemTransformParams.getTenantId(), systemTransformParams.getSchemaName(),
+                systemTransformParams.getLogicalTableName(), newTableName, sequenceNum);
+
+        // Also add the transforming new table link to views
+        TableViewFinderResult childViewsResult = ViewUtil.findChildViews(connection, systemTransformParams.getTenantId()
+                , systemTransformParams.getSchemaName(), systemTransformParams.getLogicalTableName());
+        for (TableInfo view : childViewsResult.getLinks()) {
+            addTransformTableLink(connection, view.getTenantId()==null? null: Bytes.toString(view.getTenantId()),
+                    (view.getSchemaName()==null? null: Bytes.toString(view.getSchemaName())), Bytes.toString(view.getTableName())
+                    , newTableName, sequenceNum);
+        }
+
+        return newTable;
+    }
+
+    private static void addTransformTableLink(Connection connection, String tenantId, String schemaName, String tableName,
+                                              PName newTableName, long sequenceNum) throws SQLException {
+        PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK);
+        linkStatement.setString(1, tenantId);
+        linkStatement.setString(2, schemaName);
+        linkStatement.setString(3,tableName);
+        linkStatement.setString(4, newTableName.getString());
+        linkStatement.setByte(5, PTable.LinkType.TRANSFORMING_NEW_TABLE.getSerializedValue());
+        linkStatement.setLong(6, sequenceNum);
+        linkStatement.setString(7, PTableType.TABLE.getSerializedValue());
+        linkStatement.execute();
+    }
+
+    public static SystemTransformRecord getTransformRecord(Configuration conf, PTableType tableType, PName schemaName,
+                                                           PName tableName, PName dataTableName, PName tenantId,
+                                                           PName parentLogicalName) throws SQLException {
+        if (tableType == PTableType.TABLE) {
+            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
+                return Transform.getTransformRecord(schemaName, tableName, null, tenantId, connection);
+            }
+        } else if (tableType == INDEX) {
+            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
+                return Transform.getTransformRecord(schemaName, tableName, dataTableName, tenantId, connection);
+            }
+        } else if (tableType == VIEW) {
+            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)) {
+                return Transform.getTransformRecord(SchemaUtil.getSchemaNameFromFullName(parentLogicalName.getString()),
+                        SchemaUtil.getTableNameFromFullName(parentLogicalName.getString()), null, tenantId==null? null:tenantId.getString(), connection);
+            }
+        }
+        return null;
+    }
+
+    public static SystemTransformRecord getTransformRecord(
+            PName schema, PName logicalTableName, PName logicalParentName, PName tenantId, PhoenixConnection connection) throws SQLException {
+        return  getTransformRecordFromDB((schema==null?null:schema.getString())
+                        , (logicalTableName==null?null:logicalTableName.getString())
+                        , (logicalParentName==null?null:logicalParentName.getString())
+                        , (tenantId==null?null:tenantId.getString()), connection);
     }
 
     public static SystemTransformRecord getTransformRecord(
             String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
-        try (ResultSet resultSet = connection.prepareStatement("SELECT " +
+        return getTransformRecordFromDB(schema, logicalTableName, logicalParentName, tenantId, connection);
+    }
+
+    public static SystemTransformRecord getTransformRecordFromDB(
+            String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
+        String sql = "SELECT " +
                 PhoenixDatabaseMetaData.TENANT_ID + ", " +
                 PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
                 PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
@@ -178,8 +251,9 @@ public class Transform {
                 (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
                 (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
                 PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
-                (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
-        ).executeQuery()) {
+                (Strings.isNullOrEmpty(logicalParentName) ? "" : (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'"));
+        try (ResultSet resultSet = ((PhoenixPreparedStatement) connection.prepareStatement(
+                sql)).executeQuery()) {
             if (resultSet.next()) {
                 return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
             }
@@ -299,7 +373,6 @@ public class Transform {
             } else {
                 stmt.setNull(colNum++, Types.VARCHAR);
             }
-
             LOGGER.info("Adding transform type: "
                     + systemTransformParams.getString());
             stmt.execute();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index 72bdb2d..750750e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.schema.transform;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.ByteStringer;
@@ -442,6 +443,25 @@ public class TransformMaintainer extends IndexMaintainer {
                 newTableEmptyKeyValueRef, newTableWALDisabled, oldTableImmutableStorageScheme, newTableImmutableStorageScheme, newTableEncodingScheme);
     }
 
+    public Delete buildRowDeleteMutation(byte[] rowKey, DeleteType deleteType, long ts) {
+        byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
+        Delete delete = new Delete(rowKey);
+
+        for (ColumnReference ref : newTableColumns) {
+            if (deleteType == DeleteType.SINGLE_VERSION) {
+                delete.addFamilyVersion(ref.getFamily(), ts);
+            } else {
+                delete.addFamily(ref.getFamily(), ts);
+            }
+        }
+        if (deleteType == DeleteType.SINGLE_VERSION) {
+            delete.addFamilyVersion(emptyCF, ts);
+        } else {
+            delete.addFamily(emptyCF, ts);
+        }
+        return delete;
+    }
+
     public ImmutableBytesPtr getEmptyKeyValueFamily() {
         return emptyKeyValueCFPtr;
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index 4d9dd92..b5d1624 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -19,6 +19,7 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SPLITTABLE_SYS
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
@@ -285,7 +286,33 @@ public class ViewUtil {
             return new TableViewFinderResult(tableInfoList);
         } 
     }
-    
+
+    public static TableViewFinderResult findChildViews(PhoenixConnection connection, String tenantId, String schema,
+                                                       String tableName) throws IOException, SQLException {
+        // Find child views
+        TableViewFinderResult childViewsResult = new TableViewFinderResult();
+        ReadOnlyProps readOnlyProps = connection.getQueryServices().getProps();
+        for (int i=0; i<2; i++) {
+            try (Table sysCatOrSysChildLinkTable = connection.getQueryServices()
+                    .getTable(SchemaUtil.getPhysicalName(
+                            i==0 ? SYSTEM_CHILD_LINK_NAME_BYTES : SYSTEM_CATALOG_TABLE_BYTES,
+                            readOnlyProps).getName())) {
+                byte[] tenantIdBytes = tenantId != null ? tenantId.getBytes() : null;
+                ViewUtil.findAllRelatives(sysCatOrSysChildLinkTable, tenantIdBytes,
+                        schema == null?null:schema.getBytes(),
+                        tableName.getBytes(), PTable.LinkType.CHILD_TABLE, childViewsResult);
+                break;
+            } catch (TableNotFoundException ex) {
+                // try again with SYSTEM.CATALOG in case the schema is old
+                if (i == 1) {
+                    // This means even SYSTEM.CATALOG was not found, so this is bad, rethrow
+                    throw ex;
+                }
+            }
+        }
+        return childViewsResult;
+    }
+
     /**
      * Check metadata to find if a given table/view has any immediate child views. Note that this
      * is not resilient to orphan {@code parent->child } links.
diff --git a/phoenix-core/src/main/protobuf/MetaDataService.proto b/phoenix-core/src/main/protobuf/MetaDataService.proto
index 63c79ae..7d87e26 100644
--- a/phoenix-core/src/main/protobuf/MetaDataService.proto
+++ b/phoenix-core/src/main/protobuf/MetaDataService.proto
@@ -146,6 +146,7 @@ message AddColumnRequest {
   optional int32 clientVersion = 2;
   optional PTable parentTable = 3;
   optional bool addingColumns = 4;
+  optional PTable transformingNewTable = 5;
 }
 
 message DropColumnRequest {
diff --git a/phoenix-core/src/main/protobuf/PTable.proto b/phoenix-core/src/main/protobuf/PTable.proto
index 45e1b6e..26b40b4 100644
--- a/phoenix-core/src/main/protobuf/PTable.proto
+++ b/phoenix-core/src/main/protobuf/PTable.proto
@@ -115,6 +115,7 @@ message PTable {
   optional bytes baseTableLogicalNameBytes = 48;
   optional bytes schemaVersion = 49;
   optional bytes externalSchemaId=50;
+  optional PTable transformingNewTable=51;
 }
 
 message EncodedCQCounter {