You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by go...@apache.org on 2021/12/21 22:38:13 UTC

[phoenix] branch 4.x updated: PHOENIX-6612 Add TransformTool

This is an automated email from the ASF dual-hosted git repository.

gokcen pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new d797ca8  PHOENIX-6612 Add TransformTool
d797ca8 is described below

commit d797ca8089b7edcfcc8bae64b162815e396d97dc
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Wed Aug 4 15:14:01 2021 -0700

    PHOENIX-6612 Add TransformTool
    
    Co-authored-by: Gokcen Iskender <47...@users.noreply.github.com>
    Signed-off-by: Gokcen Iskender <go...@gmail.com>
---
 .../end2end/ImmutableTablePropertiesIT.java        |  70 --
 .../phoenix/end2end/LogicalTableNameBaseIT.java    |  10 +-
 .../end2end/LogicalTableNameExtendedIT.java        |  36 +
 .../apache/phoenix/end2end/LogicalTableNameIT.java |   3 +-
 .../end2end/PhoenixRowTimestampFunctionIT.java     |  18 +-
 .../apache/phoenix/end2end/TransformToolIT.java    | 489 +++++++++++++
 .../phoenix/end2end/index/SingleCellIndexIT.java   |   4 -
 .../apache/phoenix/end2end/index/TransformIT.java  |   1 +
 .../phoenix/compile/PostIndexDDLCompiler.java      |  21 +-
 .../phoenix/compile/ServerBuildIndexCompiler.java  |  12 +-
 .../ServerBuildTransformingTableCompiler.java      |  98 +++
 .../coprocessor/BaseScannerRegionObserver.java     |   1 +
 .../coprocessor/GlobalIndexRegionScanner.java      |   9 +-
 .../org/apache/phoenix/index/IndexMaintainer.java  |  60 +-
 .../PhoenixServerBuildIndexInputFormat.java        |  22 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  12 +-
 .../index/PhoenixIndexImportDirectReducer.java     |  12 +
 .../transform/PhoenixTransformReducer.java         |  68 ++
 .../phoenix/mapreduce/transform/TransformTool.java | 762 +++++++++++++++++++++
 .../phoenix/mapreduce/util/IndexColumnNames.java   |   2 +-
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  43 ++
 .../apache/phoenix/schema/ColumnMetaDataOps.java   | 192 ++++++
 .../org/apache/phoenix/schema/DelegateTable.java   |   6 +
 .../org/apache/phoenix/schema/MetaDataClient.java  | 604 +++++++---------
 .../java/org/apache/phoenix/schema/PTable.java     |  10 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |  10 +
 .../schema/tool/SchemaExtractionProcessor.java     |  14 +-
 .../schema/transform/SystemTransformRecord.java    |  21 +
 .../apache/phoenix/schema/transform/Transform.java | 272 ++++++--
 .../schema/transform/TransformMaintainer.java      | 453 ++++++++++++
 .../src/main/protobuf/ServerCachingService.proto   |  25 +
 .../java/org/apache/phoenix/query/BaseTest.java    |   1 +
 32 files changed, 2810 insertions(+), 551 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ImmutableTablePropertiesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ImmutableTablePropertiesIT.java
index d5e204b..1e169b7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ImmutableTablePropertiesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ImmutableTablePropertiesIT.java
@@ -150,74 +150,4 @@ public class ImmutableTablePropertiesIT extends ParallelStatsDisabledIT {
             }
         } 
     }
-    
-    @Test
-    @Ignore("We now support altering storage schema")
-    public void testAlterImmutableStorageSchemeProp() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        String immutableDataTableFullName1 = SchemaUtil.getTableName("", generateUniqueName());
-        String immutableDataTableFullName2 = SchemaUtil.getTableName("", generateUniqueName());
-        try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
-            Statement stmt = conn.createStatement();
-            // create an immutable table with  ONE_CELL_PER_COLUMN storage scheme
-            String ddl = "CREATE IMMUTABLE TABLE  " + immutableDataTableFullName1 +
-                    "  (a_string varchar not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string)) COLUMN_ENCODED_BYTES=0, IMMUTABLE_STORAGE_SCHEME="
-                    + PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
-            stmt.execute(ddl);
-            // create an immutable table with  SINGLE_CELL_ARRAY_WITH_OFFSETS storage scheme
-            ddl = "CREATE IMMUTABLE TABLE  " + immutableDataTableFullName2 +
-                    "  (a_string varchar not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string)) COLUMN_ENCODED_BYTES=4, IMMUTABLE_STORAGE_SCHEME="
-                    + PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
-            stmt.execute(ddl);
-            
-            // changing the storage scheme from/to ONE_CELL_PER_COLUMN should fail
-            try {
-                stmt.execute("ALTER TABLE " + immutableDataTableFullName1 + " SET IMMUTABLE_STORAGE_SCHEME=" + PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS);
-                fail();
-            }
-            catch (SQLException e) {
-                assertEquals(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE.getErrorCode(), e.getErrorCode());
-            }
-            try {
-                stmt.execute("ALTER TABLE " + immutableDataTableFullName2 + " SET IMMUTABLE_STORAGE_SCHEME=" + PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
-                fail();
-            }
-            catch (SQLException e) {
-                assertEquals(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE.getErrorCode(), e.getErrorCode());
-            }
-        } 
-    }
-
-    @Test
-    @Ignore("We now support altering storage schema")
-    public void testAlterImmutableStorageSchemeProp_Index() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        String tableName = SchemaUtil.getTableName("", generateUniqueName());
-        String indexName = SchemaUtil.getTableName("", "IDX_" + generateUniqueName());
-        try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
-            Statement stmt = conn.createStatement();
-            // create an immutable table with  ONE_CELL_PER_COLUMN storage scheme
-            String ddl = "CREATE IMMUTABLE TABLE  " + tableName +
-                    "  (a_string varchar not null, col1 integer, col2 varchar " +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string)) COLUMN_ENCODED_BYTES=0, IMMUTABLE_STORAGE_SCHEME="
-                    + PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
-            stmt.execute(ddl);
-
-            ddl = "CREATE INDEX " + indexName + " ON " + tableName +
-                    "  (col1) INCLUDE (col2)";
-            stmt.execute(ddl);
-
-            // changing the storage scheme from/to ONE_CELL_PER_COLUMN should fail
-            try {
-                stmt.execute("ALTER INDEX " + indexName + " ON " + tableName +
-                        " ACTIVE SET IMMUTABLE_STORAGE_SCHEME=" + PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS);
-                fail();
-            }
-            catch (SQLException e) {
-                assertEquals(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE.getErrorCode(), e.getErrorCode());
-            }
-        }
-    }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
index 6df0f02..379ab58 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -529,15 +530,14 @@ public abstract class LogicalTableNameBaseIT extends BaseTest {
 
     public static void renameAndDropPhysicalTable(Connection conn, String tenantId, String schema, String tableName, String physicalName, boolean isNamespaceEnabled) throws Exception {
         String
-                changeName =
-                String.format(
-                        "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, '%s', '%s', NULL, NULL, '%s')",
-                        tenantId, schema, tableName, physicalName);
+                changeName = String.format(
+                "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, %s, '%s', NULL, NULL, '%s')",
+                tenantId, schema==null ? null : ("'" + schema + "'"), tableName, physicalName);
         conn.createStatement().execute(changeName);
         conn.commit();
 
         String fullTableName = SchemaUtil.getTableName(schema, tableName);
-        if (isNamespaceEnabled) {
+        if (isNamespaceEnabled && !(Strings.isNullOrEmpty(schema) || NULL_STRING.equals(schema))) {
             fullTableName = schema + NAMESPACE_SEPARATOR + tableName;
         }
         Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
index cb58962..2141d99 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameExtendedIT.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -53,6 +54,9 @@ public class LogicalTableNameExtendedIT extends LogicalTableNameBaseIT {
     }
 
     public LogicalTableNameExtendedIT()  {
+        StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append(" ,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN");
+        this.dataTableDdl = optionBuilder.toString();
         propsNamespace.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
     }
 
@@ -193,6 +197,38 @@ public class LogicalTableNameExtendedIT extends LogicalTableNameBaseIT {
     }
 
     @Test
+    public void testHint() throws Exception {
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String indexName2 = "IDX2_" + generateUniqueName();
+        try (Connection conn = getConnection(propsNamespace)) {
+            conn.setAutoCommit(true);
+            createTable(conn, tableName);
+            createIndexOnTable(conn, tableName, indexName);
+            createIndexOnTable(conn, tableName, indexName2);
+            populateTable(conn, tableName, 1, 2);
+
+            // Test hint
+            String tableSelect = "SELECT V1,V2,V3 FROM " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + tableSelect);
+            assertEquals(true, QueryUtil.getExplainPlan(rs).contains(indexName));
+            try (HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices()
+                    .getAdmin()) {
+                String snapshotName = new StringBuilder(indexName2).append("-Snapshot").toString();
+                admin.snapshot(snapshotName, TableName.valueOf(indexName2));
+                String newName = "NEW_" + indexName2;
+                admin.cloneSnapshot(Bytes.toBytes(snapshotName), Bytes.toBytes(newName));
+                renameAndDropPhysicalTable(conn, "NULL", null, indexName2, newName, true);
+            }
+            String indexSelect = "SELECT /*+ INDEX(" + tableName + " " + indexName2 + ")*/ V1,V2,V3 FROM " + tableName;
+            rs = conn.createStatement().executeQuery("EXPLAIN " + indexSelect);
+            assertEquals(true, QueryUtil.getExplainPlan(rs).contains(indexName2));
+            rs = conn.createStatement().executeQuery(indexSelect);
+            assertEquals(true, rs.next());
+        }
+    }
+
+    @Test
     public void testUpdatePhysicalTableName_tenantViews() throws Exception {
 
         try (Connection conn = getConnection(propsNamespace)) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
index 729a8d7..119fd7ed 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
@@ -73,8 +73,9 @@ public class LogicalTableNameIT extends LogicalTableNameBaseIT {
         this.createChildAfterRename = createChildAfterRename;
         this.immutable = immutable;
         StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append(" ,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN");
         if (immutable) {
-            optionBuilder.append(" ,IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, IMMUTABLE_ROWS=true");
+            optionBuilder.append(" , IMMUTABLE_ROWS=true");
         }
         this.dataTableDdl = optionBuilder.toString();
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
index 0ef3dce..e1a39a0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
@@ -63,13 +63,19 @@ public class PhoenixRowTimestampFunctionIT extends ParallelStatsDisabledIT {
     public PhoenixRowTimestampFunctionIT(QualifierEncodingScheme encoding,
             ImmutableStorageScheme storage) {
         StringBuilder optionBuilder = new StringBuilder();
-        optionBuilder.append(" COLUMN_ENCODED_BYTES = " + encoding.ordinal());
-        optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME = "+ storage.toString());
-        this.tableDDLOptions = optionBuilder.toString();
-        this.encoded = (encoding != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)
-                            ? true : false;
         this.optimized = storage == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS
                             ? true : false;
+        // We cannot have non encoded column names if the storage type is single cell
+        this.encoded = (encoding != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)
+                ? true : (this.optimized) ? true : false;
+
+        if (this.optimized && encoding == QualifierEncodingScheme.NON_ENCODED_QUALIFIERS) {
+            optionBuilder.append(" COLUMN_ENCODED_BYTES = " + QualifierEncodingScheme.ONE_BYTE_QUALIFIERS.ordinal());
+        } else {
+            optionBuilder.append(" COLUMN_ENCODED_BYTES = " + encoding.ordinal());
+        }
+        optionBuilder.append(", IMMUTABLE_STORAGE_SCHEME = "+ storage.toString());
+        this.tableDDLOptions = optionBuilder.toString();
     }
 
     @Parameterized.Parameters(name = "encoding={0},storage={1}")
@@ -153,7 +159,7 @@ public class PhoenixRowTimestampFunctionIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testRowTimestampDefault() throws Exception {
-
+        if (encoded || optimized) return;
         String tableName =  generateUniqueName();
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String ddl = "CREATE TABLE IF NOT EXISTS " + tableName
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
new file mode 100644
index 0000000..276f08b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TransformToolIT extends ParallelStatsDisabledIT{
+    private static final Logger LOGGER = LoggerFactory.getLogger(TransformToolIT.class);
+    private final String tableDDLOptions;
+    // TODO test with immutable
+    private boolean mutable = true;
+
+    public TransformToolIT() {
+        StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE, TTL=18000 ");
+        if (!mutable) {
+            optionBuilder.append(", IMMUTABLE_ROWS=true ");
+        }
+        tableDDLOptions = optionBuilder.toString();
+    }
+
+    @BeforeClass
+    public static synchronized void setup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+        serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+        serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+                QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+                new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+
+    private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows) throws SQLException {
+        String stmString1 =
+                "CREATE TABLE IF NOT EXISTS " + dataTableFullName
+                        + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+                        + tableDDLOptions;
+        conn.createStatement().execute(stmString1);
+        String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+        PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+        // insert rows
+        for (int i = 1; i <= numOfRows; i++) {
+            IndexToolIT.upsertRow(stmt1, i);
+        }
+        conn.commit();
+    }
+    @Test
+    public void testTransformTable() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String newTableFullName = dataTableFullName + "_1";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 2;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<String> args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false);
+            runTransformTool(args.toArray(new String[0]), 0);
+            record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+            assertEquals(getRowCount(conn, dataTableFullName), getRowCount(conn,newTableFullName));
+
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+            assertEquals(dataTableName+"_1", oldTable.getPhysicalName(true).getString());
+
+            String sql = "SELECT ID, NAME, ZIP FROM %s ";
+            ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, dataTableFullName));
+            ResultSet rs2 = conn.createStatement().executeQuery(String.format(sql, newTableFullName));
+            for (int i=0; i < numOfRows; i++) {
+                assertTrue(rs1.next());
+                assertTrue(rs2.next());
+                assertEquals(rs1.getString(1), rs2.getString(1));
+                assertEquals(rs1.getString(2), rs2.getString(2));
+                assertEquals(rs1.getInt(3), rs2.getInt(3));
+            }
+            assertFalse(rs1.next());
+            assertFalse(rs2.next());
+        }
+    }
+
+    @Test
+    public void testAbortTransform() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            createTableAndUpsertRows(conn, dataTableFullName, 2);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null,conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<String> args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, true, false, false, false);
+
+            runTransformTool(args.toArray(new String[0]), 0);
+            record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertNull(record);
+        }
+    }
+
+    private void pauseTableTransform(String schemaName, String dataTableName, Connection conn) throws Exception {
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+        createTableAndUpsertRows(conn, dataTableFullName, 2);
+
+        conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+        SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+        assertNotNull(record);
+
+        List<String> args = getArgList(schemaName, dataTableName, null,
+                null, null, null, false, true, false, false);
+
+        runTransformTool(args.toArray(new String[0]), 0);
+        record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+        assertEquals(PTable.TransformStatus.PAUSED.name(), record.getTransformStatus());
+    }
+
+    @Test
+    public void testPauseTransform() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            pauseTableTransform(schemaName, dataTableName, conn);
+        }
+    }
+
+    @Test
+    public void testResumeTransform() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            pauseTableTransform(schemaName, dataTableName, conn);
+            List<String> args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, true, false);
+
+            runTransformTool(args.toArray(new String[0]), 0);
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+            assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+        }
+    }
+
+    /**
+     * Test presplitting an index table
+     */
+    @Test
+    public void testSplitTable() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        final TableName dataTN = TableName.valueOf(dataTableFullName);
+        final TableName newDataTN = TableName.valueOf(dataTableFullName + "_1");
+        try (Connection conn =
+                     DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
+             HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+            conn.setAutoCommit(true);
+            String dataDDL =
+                    "CREATE TABLE " + dataTableFullName + "(\n"
+                            + "ID VARCHAR NOT NULL PRIMARY KEY,\n"
+                            + "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
+                            + "\"test\".CAR_NUM VARCHAR(18) NULL,\n"
+                            + "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n"
+                            + "\"info\".ORG_NAME VARCHAR(255) NULL\n" + ") IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES = 0";
+            conn.createStatement().execute(dataDDL);
+
+            String[] idPrefixes = new String[] {"1", "2", "3", "4"};
+
+            // split the data table, as the tool splits the new table to have the same # of regions
+            // doesn't really matter what the split points are, we just want a target # of regions
+            int numSplits = idPrefixes.length;
+            int targetNumRegions = numSplits + 1;
+            byte[][] splitPoints = new byte[numSplits][];
+            for (String prefix : idPrefixes) {
+                splitPoints[--numSplits] = Bytes.toBytes(prefix);
+            }
+            HTableDescriptor dataTD = admin.getTableDescriptor(dataTN);
+            admin.disableTable(dataTN);
+            admin.deleteTable(dataTN);
+            admin.createTable(dataTD, splitPoints);
+            assertEquals(targetNumRegions, admin.getTableRegions(dataTN).size());
+
+            // insert data
+            int idCounter = 1;
+            try (PreparedStatement ps = conn.prepareStatement("UPSERT INTO " + dataTableFullName
+                    + "(ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME) VALUES(?,?,?,'2021-01-01 00:00:00',11,'orgname1')")){
+                for (String carNum : idPrefixes) {
+                    for (int i = 0; i < 100; i++) {
+                        ps.setString(1, idCounter++ + "");
+                        ps.setString(2, carNum + "_" + i);
+                        ps.setString(3, "test-" + carNum + "_ " + i);
+                        ps.addBatch();
+                    }
+                }
+                ps.executeBatch();
+                conn.commit();
+            }
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            List<String> args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false);
+            // split if data table more than 3 regions
+            args.add("--autosplit=3");
+
+            args.add("-op");
+            args.add("/tmp/" + UUID.randomUUID().toString());
+
+            runTransformTool(args.toArray(new String[0]), 0);
+
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newDataTN.getNameAsString());
+            assertEquals(targetNumRegions, admin.getTableRegions(newDataTN).size());
+            assertEquals(getRowCount(conn, dataTableFullName), getRowCount(conn, dataTableFullName + "_1"));
+        }
+    }
+
+    @Test
+    public void testDataAfterTransformingMultiColFamilyTable() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        final TableName dataTN = TableName.valueOf(dataTableFullName);
+        final TableName newDataTN = TableName.valueOf(dataTableFullName + "_1");
+        try (Connection conn =
+                     DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
+             HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+            conn.setAutoCommit(true);
+            String dataDDL =
+                    "CREATE TABLE " + dataTableFullName + "(\n"
+                            + "ID VARCHAR(5) NOT NULL PRIMARY KEY,\n"
+                            + "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
+                            + "\"test\".CAR_NUM VARCHAR(18) NULL,\n"
+                            + "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n"
+                            + "\"test\".ORG_NAME VARCHAR(255) NULL\n" + ") IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES = 0";
+            conn.createStatement().execute(dataDDL);
+
+            // insert data
+            int idCounter = 1;
+            try (PreparedStatement ps = conn.prepareStatement("UPSERT INTO " + dataTableFullName
+                    + "(ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME) VALUES(?,?,?,'2021-01-01 00:00:00',11,'orgname1')")) {
+                for (int i = 0; i < 5; i++) {
+                    ps.setString(1, idCounter++ + "");
+                    ps.setString(2, "info-" + i);
+                    ps.setString(3, "test-" + i);
+                    ps.addBatch();
+                }
+                ps.executeBatch();
+                conn.commit();
+            }
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            List<String> args = getArgList(schemaName, dataTableName, null,
+                    null, null, null, false, false, false, false);
+            runTransformTool(args.toArray(new String[0]), 0);
+
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newDataTN.getNameAsString());
+            assertEquals(getRowCount(conn, dataTableFullName), getRowCount(conn, dataTableFullName + "_1"));
+
+            String select = "SELECT ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME FROM ";
+            ResultSet resultSetNew = conn.createStatement().executeQuery(select + newDataTN.getNameAsString());
+            ResultSet resultSetOld = conn.createStatement().executeQuery(select + dataTableFullName);
+            for (int i=0; i < idCounter-1; i++) {
+                assertTrue(resultSetNew.next());
+                assertTrue(resultSetOld.next());
+                assertEquals(resultSetOld.getString(1), resultSetNew.getString(1));
+                assertEquals(resultSetOld.getString(2), resultSetNew.getString(2));
+                assertEquals(resultSetOld.getString(3), resultSetNew.getString(3));
+                assertEquals(resultSetOld.getString(4), resultSetNew.getString(4));
+                assertEquals(resultSetOld.getString(5), resultSetNew.getString(5));
+                assertEquals(resultSetOld.getString(6), resultSetNew.getString(6));
+            }
+            assertFalse(resultSetNew.next());
+            assertFalse(resultSetOld.next());
+        }
+    }
+
+    @Test
+    public void testTransformIndex() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = "I_" + generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 2;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
+            SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
+            conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName +
+                    " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+
+            List<String> args = getArgList(schemaName, dataTableName, indexTableName,
+                    null, null, null, false, false, false, false);
+
+            runTransformTool(args.toArray(new String[0]), 0);
+            record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+            assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
+            assertEquals(getRowCount(conn, indexTableFullName), getRowCount(conn, indexTableFullName + "_1"));
+
+            // Test that the PhysicalTableName is updated.
+            PTable oldTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+            assertEquals(indexTableName+"_1", oldTable.getPhysicalName(true).getString());
+
+            String sql = "SELECT \":ID\", \"0:NAME\", \"0:ZIP\" FROM %s ORDER BY \":ID\"";
+            ResultSet rs1 = conn.createStatement().executeQuery(String.format(sql, indexTableFullName));
+            ResultSet rs2 = conn.createStatement().executeQuery(String.format(sql, indexTableFullName + "_1"));
+            for (int i=0; i < numOfRows; i++) {
+                assertTrue(rs1.next());
+                assertTrue(rs2.next());
+                assertEquals(rs1.getString(1), rs2.getString(1));
+                assertEquals(rs1.getString(2), rs2.getString(2));
+                assertEquals(rs1.getInt(3), rs2.getInt(3));
+            }
+            assertFalse(rs1.next());
+            assertFalse(rs2.next());
+        }
+    }
+
+    public static List<String> getArgList(String schemaName, String dataTable, String indxTable, String tenantId,
+                                          Long startTime, Long endTime,
+                                          boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial) {
+        List<String> args = Lists.newArrayList();
+        if (schemaName != null) {
+            args.add("--schema=" + schemaName);
+        }
+        // Work around CLI-254. The long-form arg parsing doesn't strip off double-quotes
+        args.add("--data-table=" + dataTable);
+        if (indxTable != null) {
+            args.add("--index-table=" + indxTable);
+        }
+
+        args.add("-op");
+        args.add("/tmp/" + UUID.randomUUID().toString());
+        // Need to run this job in foreground for the test to be deterministic
+        args.add("-runfg");
+
+        if (tenantId != null) {
+            args.add("-tenant");
+            args.add(tenantId);
+        }
+        if(startTime != null) {
+            args.add("-st");
+            args.add(String.valueOf(startTime));
+        }
+        if(endTime != null) {
+            args.add("-et");
+            args.add(String.valueOf(endTime));
+        }
+        if (shouldAbort) {
+            args.add("-abort");
+        }
+        if (shouldPause) {
+            args.add("-pause");
+        }
+        if (shouldResume) {
+            args.add("-resume");
+        }
+        if (isPartial) {
+            args.add("-pt");
+        }
+        return args;
+    }
+
+
+    public static String [] getArgValues(String schemaName,
+                                         String dataTable, String indexTable, String tenantId,
+                                         Long startTime, Long endTime) {
+        List<String> args = getArgList(schemaName, dataTable, indexTable,
+                tenantId, startTime, endTime, false, false, false, false);
+        args.add("-op");
+        args.add("/tmp/" + UUID.randomUUID().toString());
+        return args.toArray(new String[0]);
+    }
+
+    public static TransformTool runTransformTool(int expectedStatus, String schemaName, String dataTableName, String indexTableName, String tenantId,
+                                                 String... additionalArgs) throws Exception {
+        final String[] cmdArgs = getArgValues(schemaName, dataTableName,
+                indexTableName, tenantId, 0L, 0L);
+        List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
+        cmdArgList.add("-op");
+        cmdArgList.add("/tmp/" + UUID.randomUUID().toString());
+
+        cmdArgList.addAll(Arrays.asList(additionalArgs));
+        return runTransformTool(cmdArgList.toArray(new String[cmdArgList.size()]), expectedStatus);
+    }
+
+    public static TransformTool runTransformTool(String[] cmdArgs, int expectedStatus) throws Exception {
+        TransformTool tt = new TransformTool();
+        Configuration conf = new Configuration(getUtility().getConfiguration());
+        tt.setConf(conf);
+
+        LOGGER.info("Running TransformTool with {}", Arrays.toString(cmdArgs), new Exception("Stack Trace"));
+        int status = tt.run(cmdArgs);
+
+        assertEquals(expectedStatus, status);
+        return tt;
+    }
+
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
index 3eb378a..9e9dd76 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
@@ -461,10 +461,6 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT {
             for (Result result = scanner.next(); result != null; result = scanner.next()) {
                 for (Cell cell : result.rawCells()) {
                     String cellString = cell.toString();
-                    for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap()
-                            .entrySet()) {
-                        byte[] family = entryF.getKey();
-                    }
                     LOGGER.info(cellString + " ****** value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
                 }
             }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
index 8c51e80..5fc8b7a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
@@ -45,6 +45,7 @@ public class TransformIT extends ParallelStatsDisabledIT {
     private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
 
     public TransformIT() {
+        testProps.put(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, "ONE_CELL_PER_COLUMN");
         testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index 355377b..ec6fcf3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -32,6 +32,8 @@ import org.apache.phoenix.util.StringUtil;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 
+import static org.apache.phoenix.util.IndexUtil.INDEX_COLUMN_NAME_SEP;
+
 
 /**
  * Class that compiles plan to generate initial data values after a DDL command for
@@ -43,7 +45,8 @@ public class PostIndexDDLCompiler {
     private List<String> indexColumnNames;
     private List<String> dataColumnNames;
     private String selectQuery;
-    
+    private boolean forTransform = false;
+
     public PostIndexDDLCompiler(PhoenixConnection connection, TableRef dataTableRef) {
         this.connection = connection;
         this.dataTableRef = dataTableRef;
@@ -51,6 +54,11 @@ public class PostIndexDDLCompiler {
         dataColumnNames = Lists.newArrayList();
     }
 
+    public PostIndexDDLCompiler(PhoenixConnection connection, TableRef dataTableRef, boolean forTransform) {
+        this(connection, dataTableRef);
+        this.forTransform = forTransform;
+    }
+
     public MutationPlan compile(final PTable indexTable) throws SQLException {
         /*
          * Compiles an UPSERT SELECT command to read from the data table and populate the index table
@@ -69,7 +77,8 @@ public class PostIndexDDLCompiler {
             PColumn col = indexPKColumns.get(i);
             String indexColName = col.getName().getString();
             // need to escape backslash as this used in the SELECT statement
-            String dataColName = StringUtil.escapeBackslash(col.getExpressionStr());
+            String dataColName = col.getExpressionStr() == null ? col.getName().getString()
+                    : StringUtil.escapeBackslash(col.getExpressionStr());
             dataColumns.append(dataColName).append(",");
             indexColumns.append('"').append(indexColName).append("\",");
             indexColumnNames.add(indexColName);
@@ -81,10 +90,16 @@ public class PostIndexDDLCompiler {
             for (PColumn col : family.getColumns()) {
                 if (col.getViewConstant() == null) {
                     String indexColName = col.getName().getString();
-                    String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColName);
+                    // Transforming tables also behave like indexes but they don't have index_col_name_sep. So we use family name directly.
+                    String dataFamilyName = indexColName.indexOf(INDEX_COLUMN_NAME_SEP)!=-1 ? IndexUtil.getDataColumnFamilyName(indexColName) :
+                            col.getFamilyName().getString();
                     String dataColumnName = IndexUtil.getDataColumnName(indexColName);
                     if (!dataFamilyName.equals("")) {
                         dataColumns.append('"').append(dataFamilyName).append("\".");
+                        if (forTransform) {
+                            // transforming table columns have the same family name
+                            indexColumns.append('"').append(dataFamilyName).append("\".");
+                        }
                     }
                     dataColumns.append('"').append(dataColumnName).append("\",");
                     indexColumns.append('"').append(indexColName).append("\",");
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 27e3585..fd658f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -51,13 +51,13 @@ import static org.apache.phoenix.util.ScanUtil.addEmptyColumnToScan;
  * index table.
  */
 public class ServerBuildIndexCompiler {
-    private final PhoenixConnection connection;
-    private final String tableName;
-    private PTable dataTable;
-    private QueryPlan plan;
+    protected final PhoenixConnection connection;
+    protected final String tableName;
+    protected PTable dataTable;
+    protected QueryPlan plan;
 
-    private class RowCountMutationPlan extends BaseMutationPlan {
-        private RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) {
+    protected class RowCountMutationPlan extends BaseMutationPlan {
+        protected RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) {
             super(context, operation);
         }
         @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
new file mode 100644
index 0000000..0690cd8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.BaseQueryPlan;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.SQLException;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.ScanUtil.addEmptyColumnToScan;
+
+
+/**
+ * Class that compiles plan to generate initial data values after a DDL command for
+ * transforming table (new table).
+ */
+public class ServerBuildTransformingTableCompiler extends ServerBuildIndexCompiler {
+
+    public ServerBuildTransformingTableCompiler(PhoenixConnection connection, String tableName) {
+        super(connection, tableName);
+    }
+
+    public MutationPlan compile(PTable newTable) throws SQLException {
+        try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
+            String query = "SELECT /*+ NO_INDEX */ count(*) FROM " + tableName;
+            this.plan = statement.compileQuery(query);
+            TableRef tableRef = plan.getTableRef();
+            Scan scan = plan.getContext().getScan();
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            dataTable = tableRef.getTable();
+
+            // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+            // However, in this case, we need to project all of the data columns
+            for (PColumnFamily family : dataTable.getColumnFamilies()) {
+                scan.addFamily(family.getName().getBytes());
+            }
+
+            scan.setAttribute(BaseScannerRegionObserver.DO_TRANSFORMING, TRUE_BYTES);
+            TransformMaintainer.serialize(dataTable, ptr, newTable, plan.getContext().getConnection());
+
+            scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+            scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+            ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+            scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
+            // Serialize page row size only if we're overriding, else use server side value
+            String rebuildPageRowSize =
+                    connection.getQueryServices().getProps()
+                            .get(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+            if (rebuildPageRowSize != null) {
+                scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGE_ROWS,
+                        Bytes.toBytes(Long.valueOf(rebuildPageRowSize)));
+            }
+            BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+            PTable.QualifierEncodingScheme encodingScheme = newTable.getEncodingScheme();
+            addEmptyColumnToScan(scan, SchemaUtil.getEmptyColumnFamily(newTable), EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst());
+
+            if (dataTable.isTransactional()) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
+            }
+
+            // Go through MutationPlan abstraction so that we can create local indexes
+            // with a connectionless connection (which makes testing easier).
+            return new RowCountMutationPlan(plan.getContext(), PhoenixStatement.Operation.UPSERT);
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b4c204b..1217d23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -114,6 +114,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String REVERSE_SCAN = "_ReverseScan";
     public static final String ANALYZE_TABLE = "_ANALYZETABLE";
     public static final String REBUILD_INDEXES = "_RebuildIndexes";
+    public static final String DO_TRANSFORMING = "_DoTransforming";
     public static final String TX_STATE = "_TxState";
     public static final String GUIDEPOST_WIDTH_BYTES = "_GUIDEPOST_WIDTH_BYTES";
     public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index c1cc0dd..78cf256 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -65,6 +65,7 @@ import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
 import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
@@ -191,7 +192,13 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
         if (indexMetaData == null) {
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
-        List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+        byte[] transforming = scan.getAttribute(BaseScannerRegionObserver.DO_TRANSFORMING);
+        List<IndexMaintainer> maintainers = null;
+        if (transforming == null) {
+            maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+        } else {
+            maintainers = TransformMaintainer.deserialize(indexMetaData);
+        }
         indexMaintainer = maintainers.get(0);
         this.scan = scan;
         this.innerScanner = innerScanner;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index d4e68a0..1ba7651 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -400,7 +400,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     //**** START: New member variables added in 4.16 ****/
     private String logicalIndexName;
 
-    private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
+    protected IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
         this.isDataTableSalted = isDataTableSalted;
     }
@@ -1038,35 +1038,47 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         return indexRowKeySchema;
     }
-    
+
     public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
         byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, ts);
+        return buildUpdateMutation(kvBuilder, valueGetter, dataRowKeyPtr, ts, regionStartKey, regionEndKey,
+                indexRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
+                indexEmptyKeyValueRef, indexWALDisabled, dataImmutableStorageScheme, immutableStorageScheme, encodingScheme);
+    }
+
+    public static Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts,
+                                   byte[] regionStartKey, byte[] regionEndKey, byte[] destRowKey, ImmutableBytesPtr emptyKeyValueCFPtr,
+                                          Map<ColumnReference, ColumnReference> coveredColumnsMap,
+                                          ColumnReference destEmptyKeyValueRef, boolean destWALDisabled,
+                                          ImmutableStorageScheme srcImmutableStroageScheme, ImmutableStorageScheme destImmutableStorageScheme,
+                                          QualifierEncodingScheme destEncodingScheme) throws IOException {
+        Set<ColumnReference> coveredColumns = coveredColumnsMap.keySet();
         Put put = null;
         // New row being inserted: add the empty key value
         ImmutableBytesWritable latestValue = null;
         if (valueGetter==null ||
-                this.getCoveredColumns().isEmpty() ||
-                (latestValue = valueGetter.getLatestValue(indexEmptyKeyValueRef, ts)) == null ||
+                coveredColumns.isEmpty() ||
+                (latestValue = valueGetter.getLatestValue(destEmptyKeyValueRef, ts)) == null ||
                 latestValue == ValueGetter.HIDDEN_BY_DELETE) {
             // We need to track whether or not our empty key value is hidden by a Delete Family marker at the same timestamp.
             // If it is, these Puts will be masked so should not be emitted.
             if (latestValue == ValueGetter.HIDDEN_BY_DELETE) {
                 return null;
             }
-            put = new Put(indexRowKey);
+            put = new Put(destRowKey);
             // add the keyvalue for the empty row
-            put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                    this.getEmptyKeyValueFamily(), indexEmptyKeyValueRef.getQualifierWritable(), ts,
+            put.add(kvBuilder.buildPut(new ImmutableBytesPtr(destRowKey),
+                    emptyKeyValueCFPtr, destEmptyKeyValueRef.getQualifierWritable(), ts,
                     QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
-            put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+            put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
         }
 
-        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
-        if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(destRowKey);
+        if (destImmutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
             // map from index column family to list of pair of index column and data column (for covered columns)
             Map<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> familyToColListMap = Maps.newHashMap();
-            for (ColumnReference ref : this.getCoveredColumns()) {
-                ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+            for (ColumnReference ref : coveredColumns) {
+                ColumnReference indexColRef = coveredColumnsMap.get(ref);
                 ImmutableBytesPtr cf = new ImmutableBytesPtr(indexColRef.getFamily());
                 if (!familyToColListMap.containsKey(cf)) {
                     familyToColListMap.put(cf, Lists.<Pair<ColumnReference, ColumnReference>>newArrayList());
@@ -1080,7 +1092,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 int maxEncodedColumnQualifier = Integer.MIN_VALUE;
                 // find the max col qualifier
                 for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) {
-                    maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, encodingScheme.decode(colRefPair.getFirst().getQualifier()));
+                    maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, destEncodingScheme.decode(colRefPair.getFirst().getQualifier()));
                 }
                 Expression[] colValues = EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier);
                 // set the values of the columns
@@ -1088,7 +1100,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                     ColumnReference indexColRef = colRefPair.getFirst();
                     ColumnReference dataColRef = colRefPair.getSecond();
                     byte[] value = null;
-                    if (this.dataImmutableStorageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                    if (srcImmutableStroageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                         Expression expression = new SingleCellColumnExpression(new PDatum() {
                             @Override public boolean isNullable() {
                                 return false;
@@ -1109,8 +1121,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                             @Override public PDataType getDataType() {
                                 return null;
                             }
-                        }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme,
-                                immutableStorageScheme);
+                        }, dataColRef.getFamily(), dataColRef.getQualifier(), destEncodingScheme,
+                                destImmutableStorageScheme);
                         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
                         expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr);
                         value = ptr.copyBytesIfNecessary();
@@ -1122,34 +1134,34 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                         }
                     }
                     if (value != null) {
-                        int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
+                        int indexArrayPos = destEncodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
                         colValues[indexArrayPos] = new LiteralExpression(value);
                     }
                 }
                 
                 List<Expression> children = Arrays.asList(colValues);
                 // we use SingleCellConstructorExpression to serialize multiple columns into a single byte[]
-                SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(immutableStorageScheme, children);
+                SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(destImmutableStorageScheme, children);
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 singleCellConstructorExpression.evaluate(new BaseTuple() {}, ptr);
                 if (put == null) {
-                    put = new Put(indexRowKey);
-                    put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+                    put = new Put(destRowKey);
+                    put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
                 }
                 ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily);
                 //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC
                 put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));
             }
         } else {
-            for (ColumnReference ref : this.getCoveredColumns()) {
-                ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+            for (ColumnReference ref : coveredColumns) {
+                ColumnReference indexColRef = coveredColumnsMap.get(ref);
                 ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
                 ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
                 ImmutableBytesWritable value = valueGetter.getLatestValue(ref, ts);
                 if (value != null && value != ValueGetter.HIDDEN_BY_DELETE) {
                     if (put == null) {
-                        put = new Put(indexRowKey);
-                        put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+                        put = new Put(destRowKey);
+                        put.setDurability(!destWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
                     put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
                 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 1052b30..1114689 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.ServerBuildIndexCompiler;
+import org.apache.phoenix.compile.ServerBuildTransformingTableCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
@@ -63,6 +64,7 @@ import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getInde
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolSourceTable;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIsTransforming;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
@@ -87,6 +89,17 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
             String indexTableFullName) throws SQLException;
     }
 
+    private class TransformingDataTableQueryPlanBuilder implements QueryPlanBuilder {
+        @Override
+        public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String oldTableFullName,
+                                      String newTableFullName) throws SQLException {
+            PTable newTable = PhoenixRuntime.getTableNoCache(phoenixConnection, newTableFullName);
+            ServerBuildTransformingTableCompiler compiler = new ServerBuildTransformingTableCompiler(phoenixConnection, oldTableFullName);
+            MutationPlan plan = compiler.compile(newTable);
+            return plan.getQueryPlan();
+        }
+    }
+
     private class DataTableQueryPlanBuilder implements QueryPlanBuilder {
         @Override
         public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
@@ -145,8 +158,13 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
         String dataTableFullName = getIndexToolDataTableName(configuration);
         String indexTableFullName = getIndexToolIndexTableName(configuration);
         SourceTable sourceTable = getIndexToolSourceTable(configuration);
-        queryPlanBuilder = sourceTable.equals(SourceTable.DATA_TABLE_SOURCE) ?
-            new DataTableQueryPlanBuilder() : new IndexTableQueryPlanBuilder();
+        if (getIsTransforming(configuration) &&
+                PhoenixConfigurationUtil.getTransformingTableType(configuration) == SourceTable.DATA_TABLE_SOURCE) {
+            queryPlanBuilder = new TransformingDataTableQueryPlanBuilder();
+        } else {
+            queryPlanBuilder = sourceTable.equals(SourceTable.DATA_TABLE_SOURCE) ?
+                    new DataTableQueryPlanBuilder() : new IndexTableQueryPlanBuilder();
+        }
 
         try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
             PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 08e3f4b..36522dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -883,7 +883,7 @@ public class IndexTool extends Configured implements Tool {
             validateLastVerifyTime();
         }
         if(isTimeRangeSet(startTime, endTime)) {
-            validateTimeRange();
+            validateTimeRange(startTime, endTime);
         }
         if (verify) {
             String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
@@ -939,10 +939,10 @@ public class IndexTool extends Configured implements Tool {
         }
     }
 
-    private void validateTimeRange() {
+    public static void validateTimeRange(Long sTime, Long eTime) {
         Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
-        Long st = (startTime == null) ? 0 : startTime;
-        Long et = (endTime == null) ? currentTime : endTime;
+        Long st = (sTime == null) ? 0 : sTime;
+        Long et = (eTime == null) ? currentTime : eTime;
         if (st.compareTo(currentTime) > 0 || et.compareTo(currentTime) > 0 || st.compareTo(et) >= 0) {
             throw new RuntimeException(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
         }
@@ -975,7 +975,7 @@ public class IndexTool extends Configured implements Tool {
         changeDisabledIndexStateToBuiding(connection);
     }
 
-    private static boolean isTimeRangeSet(Long startTime, Long endTime) {
+    public static boolean isTimeRangeSet(Long startTime, Long endTime) {
         return startTime != null || endTime != null;
     }
 
@@ -1089,7 +1089,7 @@ public class IndexTool extends Configured implements Tool {
     }
 
     // setup a ValueGetter to get index values from the ResultSet
-    private ValueGetter getIndexValueGetter(final PhoenixResultSet rs, List<String> dataColNames) {
+    public static ValueGetter getIndexValueGetter(final PhoenixResultSet rs, List<String> dataColNames) {
         // map from data col name to index in ResultSet
         final Map<String, Integer> rsIndex = new HashMap<>(dataColNames.size());
         int i = 1;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 2724990..a341d2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -33,11 +33,13 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.Transform;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -159,6 +161,16 @@ public class PhoenixIndexImportDirectReducer extends
                 throw new RuntimeException(e.getMessage());
             }
         }
+
+        if (PhoenixConfigurationUtil.getIsTransforming(context.getConfiguration())) {
+            try {
+                Transform.completeTransform(ConnectionUtil
+                        .getInputConnection(context.getConfiguration()), context.getConfiguration());
+            } catch (Exception e) {
+                LOGGER.error(" Failed to complete transform", e);
+                throw new RuntimeException(e.getMessage());
+            }
+        }
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
new file mode 100644
index 0000000..4a1c4fe
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.schema.transform.Transform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Reducer class that does only one task and that is to complete transform.
+ */
+public class PhoenixTransformReducer extends
+        Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> {
+    private AtomicBoolean calledOnce = new AtomicBoolean(false);
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(PhoenixTransformReducer.class);
+
+    @Override
+    protected void setup(Context context) throws IOException {
+    }
+
+    @Override
+    protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1,
+                          Context context)
+            throws IOException, InterruptedException {
+        if (!calledOnce.compareAndSet(false, true)) {
+            return;
+        }
+
+        try (final Connection
+                     connection = ConnectionUtil.getInputConnection(context.getConfiguration())){
+            // Complete full Transform and add a partial transform
+            Transform.completeTransform(connection, context.getConfiguration());
+        } catch (Exception e) {
+            LOGGER.error(" Failed to complete transform", e);
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException{
+
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
new file mode 100644
index 0000000..5e5cdcf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -0,0 +1,762 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexDBWritable;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
+import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
+import static org.apache.phoenix.util.QueryUtil.getConnection;
+
+public class TransformTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TransformTool.class);
+
+    public enum MR_COUNTER_METRICS {
+        TRANSFORM_FAILED,
+        TRANSFORM_SUCCEED
+    }
+
+    private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true,
+            "Output path where the files are written");
+    private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true,
+            "Phoenix schema name (optional)");
+    private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true,
+            "Data table name (mandatory)");
+    private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true,
+            "Index table name(not required in case of partial rebuilding)");
+
+    private static final Option PARTIAL_TRANSFORM_OPTION = new Option("pt", "partial-transform", false,
+            "To transform a data table from a start timestamp");
+
+    private static final Option ABORT_TRANSFORM_OPTION = new Option("abort", "abort", false,
+            "Aborts the ongoing transform");
+
+    private static final Option PAUSE_TRANSFORM_OPTION = new Option("pause", "pause", false,
+            "Pauses the ongoing transform. If the ongoing transform fails, it will not be retried");
+
+    private static final Option RESUME_TRANSFORM_OPTION = new Option("resume", "resume", false,
+            "Resumes the ongoing transform");
+
+    private static final Option JOB_PRIORITY_OPTION = new Option("p", "job-priority", true,
+            "Define job priority from 0(highest) to 4. Default is 2(normal)");
+
+    private static final int DEFAULT_AUTOSPLIT_NUM_REGIONS = 20;
+
+    private static final Option AUTO_SPLIT_OPTION =
+            new Option("spa", "autosplit", true,
+                    "Automatically split the new table if the # of data table regions is greater than N. "
+                            + "Takes an optional argument specifying N, otherwise defaults to " + DEFAULT_AUTOSPLIT_NUM_REGIONS
+            );
+
+    private static final Option RUN_FOREGROUND_OPTION =
+            new Option(
+                    "runfg",
+                    "run-foreground",
+                    false,
+                    "If specified, runs transform in Foreground. Default - Runs the transform in background.");
+
+    private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true,
+            "If specified, uses Tenant connection for tenant index transform (optional)");
+
+    private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+    private static final Option START_TIME_OPTION = new Option("st", "start-time",
+            true, "Start time for transform");
+
+    private static final Option END_TIME_OPTION = new Option("et", "end-time",
+            true, "End time for transform");
+
+    public static final String TRANSFORM_JOB_NAME_TEMPLATE = "PHOENIX_TRANS_%s.%s";
+
+    public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial transform accepts "
+            + "non-zero ts set in the past as start-time(st) option and that ts must be present in SYSTEM.TRANSFORM table";
+
+    public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views or transactional tables";
+
+    public static final String PARTIAL_TRANSFORM_NOT_COMPATIBLE = "Can't abort/pause/resume/split during partial transform";
+
+    private Configuration configuration;
+    private Connection connection;
+    private String tenantId;
+    private String dataTable;
+    private String logicalParentName;
+    private String basePath;
+    // logicalTableName is index table and logicalParentName is the data table if this is an index transform
+    // If this is a data table transform, logicalParentName is null and logicalTableName is dataTable
+    private String logicalTableName;
+    private String schemaName;
+    private String indexTable;
+    private String qDataTable; //normalized with schema
+    private PTable pIndexTable = null;
+    private PTable pDataTable;
+    private PTable pOldTable;
+    private PTable pNewTable;
+
+    private String oldTableWithSchema;
+    private String newTableWithSchema;
+    private JobPriority jobPriority;
+    private String jobName;
+    private boolean isForeground;
+    private Long startTime, endTime, lastTransformTime;
+    private boolean isPartialTransform;
+    private Job job;
+
+    public Long getStartTime() {
+        return startTime;
+    }
+
+    public Long getEndTime() { return endTime; }
+
+    public CommandLine parseOptions(String[] args) {
+        final Options options = getOptions();
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: " + e.getMessage(),
+                    options);
+        }
+
+        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        this.jobPriority = getJobPriority(cmdLine);
+
+        boolean dataTableProvided = (cmdLine.hasOption(DATA_TABLE_OPTION.getOpt()));
+        if (!dataTableProvided) {
+            throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory parameter");
+        }
+
+        return cmdLine;
+    }
+
+    private Options getOptions() {
+        final Options options = new Options();
+        options.addOption(OUTPUT_PATH_OPTION);
+        options.addOption(SCHEMA_NAME_OPTION);
+        options.addOption(DATA_TABLE_OPTION);
+        options.addOption(INDEX_TABLE_OPTION);
+        options.addOption(TENANT_ID_OPTION);
+        options.addOption(HELP_OPTION);
+        options.addOption(JOB_PRIORITY_OPTION);
+        options.addOption(RUN_FOREGROUND_OPTION);
+        options.addOption(PARTIAL_TRANSFORM_OPTION);
+        options.addOption(START_TIME_OPTION);
+        options.addOption(END_TIME_OPTION);
+        options.addOption(AUTO_SPLIT_OPTION);
+        options.addOption(ABORT_TRANSFORM_OPTION);
+        options.addOption(PAUSE_TRANSFORM_OPTION);
+        options.addOption(RESUME_TRANSFORM_OPTION);
+        START_TIME_OPTION.setOptionalArg(true);
+        END_TIME_OPTION.setOptionalArg(true);
+        return options;
+    }
+
+    private void printHelpAndExit(String errorMessage, Options options) {
+        System.err.println(errorMessage);
+        LOGGER.error(errorMessage);
+        printHelpAndExit(options, 1);
+    }
+
+    private void printHelpAndExit(Options options, int exitCode) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("help", options);
+        System.exit(exitCode);
+    }
+
+    public CommandLine parseArgs(String[] args) throws Exception {
+        CommandLine cmdLine;
+        try {
+            cmdLine = parseOptions(args);
+        } catch (IllegalStateException e) {
+            printHelpAndExit(e.getMessage(), getOptions());
+            throw e;
+        }
+
+        if (getConf() == null) {
+            setConf(HBaseConfiguration.create());
+        }
+
+        return cmdLine;
+    }
+
+    @VisibleForTesting
+    public int populateTransformToolAttributesAndValidate(CommandLine cmdLine) throws Exception {
+        boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
+        boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
+        basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+        isPartialTransform = cmdLine.hasOption(PARTIAL_TRANSFORM_OPTION.getOpt());
+        if (useStartTime) {
+            startTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+        }
+
+        if (useEndTime) {
+            endTime = new Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt()));
+        }
+
+        if (isTimeRangeSet(startTime, endTime)) {
+            validateTimeRange(startTime, endTime);
+        }
+
+        if (isPartialTransform &&
+                (cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt()))) {
+            throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
+        }
+        if (isPartialTransform &&
+                (cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt()) || cmdLine.hasOption(PAUSE_TRANSFORM_OPTION.getOpt())
+                        || cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt()))) {
+            throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
+        }
+
+        if (isPartialTransform) {
+            if (!cmdLine.hasOption(START_TIME_OPTION.getOpt())) {
+                throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+            }
+            lastTransformTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+            SystemTransformRecord transformRecord = getTransformRecord(null);
+            if (transformRecord == null) {
+                throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+            }
+            if (lastTransformTime == null) {
+                lastTransformTime = transformRecord.getTransformEndTs().getTime();
+            } else {
+                validateLastTransformTime();
+            }
+        }
+
+        schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+        dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+        indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+        qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+        isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+        logicalTableName = dataTable;
+        logicalParentName = null;
+        if (!Strings.isNullOrEmpty(indexTable)) {
+            logicalTableName = indexTable;
+            logicalParentName = SchemaUtil.getTableName(schemaName, dataTable);
+        }
+
+        pDataTable = PhoenixRuntime.getTable(
+                connection, SchemaUtil.getQualifiedTableName(schemaName, dataTable));
+        if (indexTable != null) {
+            pIndexTable = PhoenixRuntime.getTable(
+                    connection, SchemaUtil.getQualifiedTableName(schemaName, indexTable));
+            pOldTable = pIndexTable;
+        } else {
+            pOldTable = pDataTable;
+        }
+
+        SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+
+        validateTransform(pDataTable, pIndexTable, transformRecord);
+        String newTableName = SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
+        pNewTable = PhoenixRuntime.getTableNoCache(
+                connection, SchemaUtil.getQualifiedTableName(schemaName, newTableName));
+
+
+        oldTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, SchemaUtil.getTableNameFromFullName(pOldTable.getName().getString()));
+        newTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, SchemaUtil.getTableNameFromFullName(pNewTable.getName().getString()));
+        return 0;
+    }
+
+    public void validateTransform(PTable argPDataTable, PTable argIndexTable, SystemTransformRecord transformRecord) throws Exception {
+
+        if (argPDataTable.getType() != PTableType.TABLE) {
+            throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+        }
+
+        if (argIndexTable != null && argIndexTable.getType() != PTableType.INDEX) {
+            throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+        }
+
+        if (argPDataTable.isTransactional()) {
+            throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+        }
+
+        if (transformRecord == null){
+            throw new IllegalStateException("ALTER statement has not been run and the transform has not been created for this table");
+        }
+
+        if (pDataTable != null && pIndexTable != null) {
+            if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
+                throw new IllegalArgumentException(
+                        String.format(" %s is not an index table for %s for this connection",
+                                indexTable, qDataTable));
+            }
+
+            PTable.IndexType indexType = argIndexTable.getIndexType();
+            if (PTable.IndexType.LOCAL.equals(indexType)) {
+                throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
+            }
+        }
+    }
+
+    public int validateLastTransformTime() throws Exception {
+        Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+        if (lastTransformTime.compareTo(currentTime) > 0 || lastTransformTime == 0L) {
+            throw new RuntimeException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+        }
+        return 0;
+    }
+
+    public SystemTransformRecord getTransformRecord(PhoenixConnection connection) throws Exception {
+        if (connection == null) {
+            try (Connection conn = getConnection(configuration)) {
+                SystemTransformRecord transformRecord = Transform.getTransformRecord(schemaName, logicalTableName, logicalParentName, tenantId, conn.unwrap(PhoenixConnection.class));
+                return transformRecord;
+            }
+        } else {
+            return  Transform.getTransformRecord(schemaName, logicalTableName, logicalParentName, tenantId, connection);
+        }
+    }
+
+    public String getJobPriority() {
+        return this.jobPriority.toString();
+    }
+
+    private JobPriority getJobPriority(CommandLine cmdLine) {
+        String jobPriorityOption = cmdLine.getOptionValue(JOB_PRIORITY_OPTION.getOpt());
+        if (jobPriorityOption == null) {
+            return JobPriority.NORMAL;
+        }
+
+        switch (jobPriorityOption) {
+            case "0" : return JobPriority.VERY_HIGH;
+            case "1" : return JobPriority.HIGH;
+            case "2" : return JobPriority.NORMAL;
+            case "3" : return JobPriority.LOW;
+            case "4" : return JobPriority.VERY_LOW;
+            default:
+                return JobPriority.NORMAL;
+        }
+    }
+
+    public Job getJob() {
+        return this.job;
+    }
+
+    public String getTenantId() {
+        return this.tenantId;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public Job configureJob() throws Exception {
+        final String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
+        if (lastTransformTime != null) {
+            PhoenixConfigurationUtil.setCurrentScnValue(configuration, lastTransformTime);
+        }
+
+        final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
+        final PostIndexDDLCompiler ddlCompiler =
+                new PostIndexDDLCompiler(pConnection, new TableRef(pOldTable), true);
+        ddlCompiler.compile(pNewTable);
+        final List<String> newColumns = ddlCompiler.getDataColumnNames();
+        //final String selectQuery = ddlCompiler.getSelectQuery();
+        final String upsertQuery =
+                QueryUtil.constructUpsertStatement(newTableWithSchema, newColumns, HintNode.Hint.NO_INDEX);
+
+        configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
+        //PhoenixConfigurationUtil.setPhysicalTableName(configuration, pNewTable.getPhysicalName().getString());
+
+        PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
+                ddlCompiler.getIndexColumnNames().toArray(new String[ddlCompiler.getIndexColumnNames().size()]));
+        if (tenantId != null) {
+            PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+        }
+
+        long indexRebuildQueryTimeoutMs =
+                configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
+                        QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
+        long indexRebuildRPCTimeoutMs =
+                configuration.getLong(QueryServices.INDEX_REBUILD_RPC_TIMEOUT_ATTRIB,
+                        QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_TIMEOUT);
+        long indexRebuildClientScannerTimeOutMs =
+                configuration.getLong(QueryServices.INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT_ATTRIB,
+                        QueryServicesOptions.DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT);
+        int indexRebuildRpcRetriesCounter =
+                configuration.getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
+                        QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER);
+        // Set various phoenix and hbase level timeouts and rpc retries
+        configuration.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+                Long.toString(indexRebuildQueryTimeoutMs));
+        configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+                Long.toString(indexRebuildClientScannerTimeOutMs));
+        configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY,
+                Long.toString(indexRebuildRPCTimeoutMs));
+        configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+                Long.toString(indexRebuildRpcRetriesCounter));
+        configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs));
+
+        PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, oldTableWithSchema);
+        PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, newTableWithSchema);
+        PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+        if (startTime != null) {
+            PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
+        }
+
+        PhoenixConfigurationUtil.setPhysicalTableName(configuration, pNewTable.getPhysicalName().getString());
+        PhoenixConfigurationUtil.setIsTransforming(configuration, true);
+        Path outputPath = null;
+        org.apache.hadoop.fs.FileSystem fs;
+        if (basePath != null) {
+            outputPath =
+                    CsvBulkImportUtil.getOutputPath(new Path(basePath),
+                            pIndexTable == null ?
+                                    pDataTable.getPhysicalName().getString() :
+                                    pIndexTable.getPhysicalName().getString());
+            fs = outputPath.getFileSystem(configuration);
+            fs.delete(outputPath, true);
+        }
+        this.job = Job.getInstance(getConf(), jobName);
+        job.setJarByClass(TransformTool.class);
+        job.setPriority(this.jobPriority);
+        PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
+                oldTableWithSchema, "");
+        if (outputPath != null) {
+            FileOutputFormat.setOutputPath(job, outputPath);
+        }
+        job.setReducerClass(PhoenixTransformReducer.class);
+        job.setNumReduceTasks(1);
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+
+        //Set the Output classes
+        job.setMapOutputValueClass(IntWritable.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(NullWritable.class);
+        TableMapReduceUtil.addDependencyJars(job);
+        job.setMapperClass(PhoenixServerBuildIndexMapper.class);
+
+        TableMapReduceUtil.initCredentials(job);
+        LOGGER.info("TransformTool is running for " + job.getJobName());
+
+        return job;
+    }
+
+    public int runJob() throws IOException {
+        try {
+            if (isForeground) {
+                LOGGER.info("Running TransformTool in foreground. " +
+                        "Runs full table scans. This may take a long time!");
+                return (job.waitForCompletion(true)) ? 0 : 1;
+            } else {
+                LOGGER.info("Running TransformTool in Background - Submit async and exit");
+                job.submit();
+                return 0;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Caught exception " + e + " trying to run TransformTool.");
+            return 1;
+        }
+    }
+
+    private void preSplitTable(CommandLine cmdLine, Connection connection,
+                               Configuration configuration, PTable newTable, PTable oldTable)
+            throws SQLException, IOException {
+        boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt());
+
+        if (autosplit) {
+            String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_OPTION.getOpt());
+            int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
+            LOGGER.info(String.format("Will split table %s , autosplit=%s ," +
+                            " autoSplitNumRegions=%s", newTable.getPhysicalName(),
+                    autosplit, autosplitNumRegions));
+
+            splitTable(connection.unwrap(PhoenixConnection.class), autosplit,
+                    autosplitNumRegions, newTable, oldTable);
+        }
+    }
+
+    private void splitTable(PhoenixConnection pConnection, boolean autosplit,
+                            int autosplitNumRegions, PTable newTable, PTable oldTable)
+            throws SQLException, IOException, IllegalArgumentException {
+        int numRegions;
+        byte[][] oldSplitPoints = null;
+        byte[][] newSplitPoints = null;
+        // TODO : if the rowkey changes via transform, we need to create new split points
+        try (Table hDataTable =
+                     (Table) pConnection.getQueryServices()
+                             .getTable(oldTable.getPhysicalName().getBytes());
+             org.apache.hadoop.hbase.client.Connection connection =
+                     HBaseFactoryProvider.getHConnectionFactory().createConnection(configuration)) {
+            // Avoid duplicate split keys and remove the empty key
+            oldSplitPoints = connection.getRegionLocator(hDataTable.getName()).getStartKeys();
+            Arrays.sort(oldSplitPoints, Bytes.BYTES_COMPARATOR);
+            int numSplits = oldSplitPoints.length;
+            ArrayList<byte[]> splitList = new ArrayList<>();
+            byte[] lastKey = null;
+            for (byte[] keyBytes : oldSplitPoints) {
+                if (Bytes.compareTo(keyBytes, EMPTY_BYTE_ARRAY)!=0) {
+                    if (lastKey != null && !Bytes.equals(keyBytes, lastKey)) {
+                        splitList.add(keyBytes);
+                    }
+                }
+                lastKey = keyBytes;
+            }
+            newSplitPoints = new byte[splitList.size()][];
+            for (int i=0; i < splitList.size(); i++) {
+                newSplitPoints[i] = splitList.get(i);
+            }
+            numRegions = newSplitPoints.length;
+            if (autosplit && (numRegions <= autosplitNumRegions)) {
+                LOGGER.info(String.format(
+                        "Will not split %s because the data table only has %s regions, autoSplitNumRegions=%s",
+                        newTable.getPhysicalName(), numRegions, autosplitNumRegions));
+                return; // do nothing if # of regions is too low
+            }
+        }
+
+        try (HBaseAdmin admin = pConnection.getQueryServices().getAdmin()) {
+            // do the split
+            // drop table and recreate with appropriate splits
+            TableName newTableSplitted = TableName.valueOf(newTable.getPhysicalName().getBytes());
+            HTableDescriptor descriptor = admin.getTableDescriptor(newTableSplitted);
+            admin.disableTable(newTableSplitted);
+            admin.deleteTable(newTableSplitted);
+            admin.createTable(descriptor, newSplitPoints);
+        }
+    }
+
+    public void updateTransformRecord(PhoenixConnection connection, PTable.TransformStatus newStatus) throws Exception {
+        SystemTransformRecord transformRecord = getTransformRecord(connection);
+        updateTransformRecord(connection, transformRecord, newStatus);
+    }
+
+    public static void updateTransformRecord(PhoenixConnection connection, SystemTransformRecord transformRecord, PTable.TransformStatus newStatus) throws Exception {
+        SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
+        builder.setTransformStatus(newStatus.name());
+        if (newStatus == PTable.TransformStatus.COMPLETED || newStatus == PTable.TransformStatus.FAILED) {
+            builder.setEndTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()));
+        }
+        Transform.upsertTransform(builder.build(), connection.unwrap(PhoenixConnection.class));
+    }
+
+    protected void updateTransformRecord(Job job) throws Exception {
+        if (job == null) {
+            return;
+        }
+        SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+        SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
+        builder.setTransformJobId(job.getJobID().toString());
+        builder.setStartTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()));
+        Transform.upsertTransform(builder.build(), connection.unwrap(PhoenixConnection.class));
+    }
+
+    public void killJob(SystemTransformRecord transformRecord) throws Exception{
+        String jobId = transformRecord.getTransformJobId();
+        if (!Strings.isNullOrEmpty(jobId)) {
+            JobClient jobClient = new JobClient();
+            RunningJob runningJob = jobClient.getJob(jobId);
+            if (runningJob != null) {
+                try {
+                    runningJob.killJob();
+                } catch (IOException ex) {
+                    LOGGER.warn("Transform abort could not kill the job. ", ex);
+                }
+            }
+        }
+    }
+
+    public void abortTransform() throws Exception {
+        SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+        if (transformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name())) {
+            throw new IllegalStateException("A completed transform cannot be aborted");
+        }
+
+        killJob(transformRecord);
+        Transform.removeTransformRecord(transformRecord, connection.unwrap(PhoenixConnection.class));
+
+        // TODO: disable transform on the old table
+
+        // Cleanup syscat
+        try (Statement stmt = connection.createStatement()) {
+            if (pIndexTable != null) {
+                stmt.execute("DROP INDEX " + transformRecord.getNewPhysicalTableName());
+            } else {
+                stmt.execute("DROP TABLE " + transformRecord.getNewPhysicalTableName());
+            }
+        } catch (SQLException ex) {
+            LOGGER.warn("Transform abort could not drop the table " + transformRecord.getNewPhysicalTableName());
+        }
+    }
+
+    public void pauseTransform() throws Exception {
+        SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+        if (transformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name())) {
+            throw new IllegalStateException("A completed transform cannot be paused");
+        }
+
+        updateTransformRecord(connection.unwrap(PhoenixConnection.class), PTable.TransformStatus.PAUSED);
+        killJob(transformRecord);
+    }
+
+    public void resumeTransform(String[] args, CommandLine cmdLine) throws Exception {
+        SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+        if (!transformRecord.getTransformStatus().equals(PTable.TransformStatus.PAUSED.name())) {
+            throw new IllegalStateException("Only a paused transform can be resumed");
+        }
+
+        runTransform(args, cmdLine);
+    }
+
+    public int runTransform(String[] args, CommandLine cmdLine) throws Exception {
+        int status = 0;
+        updateTransformRecord(connection.unwrap(PhoenixConnection.class), PTable.TransformStatus.STARTED);
+        PhoenixConfigurationUtil.setIsPartialTransform(configuration, isPartialTransform);
+        PhoenixConfigurationUtil.setIsTransforming(configuration, true);
+
+        if (!Strings.isNullOrEmpty(indexTable)) {
+            PhoenixConfigurationUtil.setTransformingTableType(configuration, IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
+            // Index table transform. Build the index
+            IndexTool indexTool = new IndexTool();
+            indexTool.setConf(configuration);
+            status = indexTool.run(args);
+            Job job = indexTool.getJob();
+            updateTransformRecord(job);
+        } else {
+            PhoenixConfigurationUtil.setTransformingTableType(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+            if (!isPartialTransform) {
+                preSplitTable(cmdLine, connection, configuration, pNewTable, pOldTable);
+            }
+            configureJob();
+            status = runJob();
+            updateTransformRecord(this.job);
+        }
+
+        // Record status
+        if (status != 0) {
+            LOGGER.error("TransformTool/IndexTool job failed! Check logs for errors..");
+            updateTransformRecord(connection.unwrap(PhoenixConnection.class), PTable.TransformStatus.FAILED);
+            return -1;
+        }
+
+        return status;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        connection = null;
+        int ret = 0;
+        CommandLine cmdLine = null;
+        configuration = HBaseConfiguration.addHbaseResources(getConf());
+        try {
+            cmdLine = parseArgs(args);
+            if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+                tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+                if (!Strings.isNullOrEmpty(tenantId)) {
+                    configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+                }
+            }
+            try (Connection conn = getConnection(configuration)) {
+                this.connection = conn;
+                this.connection.setAutoCommit(true);
+                populateTransformToolAttributesAndValidate(cmdLine);
+                if (cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt())) {
+                    abortTransform();
+                } else if (cmdLine.hasOption(PAUSE_TRANSFORM_OPTION.getOpt())) {
+                    pauseTransform();
+                } else if (cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt())) {
+                    resumeTransform(args,  cmdLine);
+                } else {
+                    ret = runTransform(args, cmdLine);
+                }
+                return ret;
+            } catch (Exception ex) {
+                LOGGER.error("An error occurred while transforming " + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
+                return -1;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            printHelpAndExit(e.toString(), getOptions());
+            return -1;
+        }
+    }
+
+    public static void main(final String[] args) throws Exception {
+        int result = ToolRunner.run(new TransformTool(), args);
+        System.exit(result);
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
index 05d10f4..2b93a86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -133,7 +133,7 @@ public class IndexColumnNames {
         }
     }
 
-    private String getDataColFullName(PColumn dCol) {
+    public static String getDataColFullName(PColumn dCol) {
         String dColFullName = "";
         if (dCol.getFamilyName() != null) {
             dColFullName += dCol.getFamilyName().getString() + QueryConstants.NAME_SEPARATOR;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 8cb3445..9d7df7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -196,6 +196,15 @@ public final class PhoenixConfigurationUtil {
     // by default MR snapshot restore is handled internally by phoenix
     public static final boolean DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = false;
 
+    // Is the mapreduce used for table/index transform
+    public static final String IS_TRANSFORMING_VALUE = "phoenix.mr.istransforming";
+
+    // Is the mapreduce used for table/index transform
+    public static final String TRANSFORMING_TABLE_TYPE = "phoenix.mr.transform.tabletype";
+
+    public static final String IS_PARTIAL_TRANSFORM = "phoenix.mr.transform.ispartial";
+
+
     /**
      * Determines type of Phoenix Map Reduce job.
      * 1. QUERY allows running arbitrary queries without aggregates
@@ -602,6 +611,40 @@ public final class PhoenixConfigurationUtil {
         return clientPortString==null ? null : Integer.parseInt(clientPortString);
     }
 
+    public static void setIsTransforming(Configuration configuration, Boolean isTransforming) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(isTransforming);
+        configuration.set(IS_TRANSFORMING_VALUE, Boolean.toString(isTransforming));
+    }
+
+    public static Boolean getIsTransforming(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return Boolean.valueOf(configuration.get(IS_TRANSFORMING_VALUE, "false"));
+    }
+
+    public static void setTransformingTableType(Configuration configuration,
+                                                SourceTable sourceTable) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(sourceTable);
+        configuration.set(TRANSFORMING_TABLE_TYPE, sourceTable.name());
+    }
+
+    public static SourceTable getTransformingTableType(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return SourceTable.valueOf(configuration.get(TRANSFORMING_TABLE_TYPE));
+    }
+
+    public static void setIsPartialTransform(final Configuration configuration, Boolean partialTransform) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(partialTransform);
+        configuration.set(IS_PARTIAL_TRANSFORM, String.valueOf(partialTransform));
+    }
+
+    public static boolean getIsPartialTransform(final Configuration configuration)  {
+        Preconditions.checkNotNull(configuration);
+        return configuration.getBoolean(IS_PARTIAL_TRANSFORM, false);
+    }
+
     /**
      * Returns the HBase zookeeper znode parent
      * @param configuration
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
new file mode 100644
index 0000000..90451af
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnMetaDataOps.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.ColumnDef;
+import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.PrimaryKeyConstraint;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
+
+public class ColumnMetaDataOps {
+    public static final String UPSERT_COLUMN =
+            "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
+                    TENANT_ID + "," +
+                    TABLE_SCHEM + "," +
+                    TABLE_NAME + "," +
+                    COLUMN_NAME + "," +
+                    COLUMN_FAMILY + "," +
+                    DATA_TYPE + "," +
+                    NULLABLE + "," +
+                    COLUMN_SIZE + "," +
+                    DECIMAL_DIGITS + "," +
+                    ORDINAL_POSITION + "," +
+                    SORT_ORDER + "," +
+                    DATA_TABLE_NAME + "," + // write this both in the column and table rows for access by metadata APIs
+                    ARRAY_SIZE + "," +
+                    VIEW_CONSTANT + "," +
+                    IS_VIEW_REFERENCED + "," +
+                    PK_NAME + "," +  // write this both in the column and table rows for access by metadata APIs
+                    KEY_SEQ + "," +
+                    COLUMN_DEF + "," +
+                    COLUMN_QUALIFIER + ", " +
+                    IS_ROW_TIMESTAMP +
+                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+    public static void addColumnMutation(PhoenixConnection connection, String schemaName, String tableName, PColumn column, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
+        try (PreparedStatement colUpsert = connection.prepareStatement(UPSERT_COLUMN)) {
+            colUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+            colUpsert.setString(2, schemaName);
+            colUpsert.setString(3, tableName);
+            colUpsert.setString(4, column.getName().getString());
+            colUpsert.setString(5, column.getFamilyName() == null ? null : column.getFamilyName().getString());
+            colUpsert.setInt(6, column.getDataType().getSqlType());
+            colUpsert.setInt(7, SchemaUtil.getIsNullableInt(column.isNullable()));
+            if (column.getMaxLength() == null) {
+                colUpsert.setNull(8, Types.INTEGER);
+            } else {
+                colUpsert.setInt(8, column.getMaxLength());
+            }
+            if (column.getScale() == null) {
+                colUpsert.setNull(9, Types.INTEGER);
+            } else {
+                colUpsert.setInt(9, column.getScale());
+            }
+            colUpsert.setInt(10, column.getPosition() + (isSalted ? 0 : 1));
+            colUpsert.setInt(11, column.getSortOrder().getSystemValue());
+            colUpsert.setString(12, parentTableName);
+            if (column.getArraySize() == null) {
+                colUpsert.setNull(13, Types.INTEGER);
+            } else {
+                colUpsert.setInt(13, column.getArraySize());
+            }
+            colUpsert.setBytes(14, column.getViewConstant());
+            colUpsert.setBoolean(15, column.isViewReferenced());
+            colUpsert.setString(16, pkName);
+            if (keySeq == null) {
+                colUpsert.setNull(17, Types.SMALLINT);
+            } else {
+                colUpsert.setShort(17, keySeq);
+            }
+            if (column.getExpressionStr() == null) {
+                colUpsert.setNull(18, Types.VARCHAR);
+            } else {
+                colUpsert.setString(18, column.getExpressionStr());
+            }
+            if (column.getColumnQualifierBytes() == null) {
+                colUpsert.setNull(19, Types.VARBINARY);
+            } else {
+                colUpsert.setBytes(19, column.getColumnQualifierBytes());
+            }
+            colUpsert.setBoolean(20, column.isRowTimestamp());
+            colUpsert.execute();
+        }
+    }
+
+    public static PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily,
+                                    boolean addingToPK, byte[] columnQualifierBytes, boolean isImmutableRows) throws SQLException {
+        try {
+            ColumnName columnDefName = def.getColumnDefName();
+            SortOrder sortOrder = def.getSortOrder();
+            boolean isPK = def.isPK();
+            boolean isRowTimestamp = def.isRowTimestamp();
+            if (pkConstraint != null) {
+                Pair<ColumnName, SortOrder> pkSortOrder = pkConstraint.getColumnWithSortOrder(columnDefName);
+                if (pkSortOrder != null) {
+                    isPK = true;
+                    sortOrder = pkSortOrder.getSecond();
+                    isRowTimestamp = pkConstraint.isColumnRowTimestamp(columnDefName);
+                }
+            }
+            String columnName = columnDefName.getColumnName();
+            if (isPK && sortOrder == SortOrder.DESC && def.getDataType() == PVarbinary.INSTANCE) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.DESC_VARBINARY_NOT_SUPPORTED)
+                        .setColumnName(columnName)
+                        .build().buildException();
+            }
+
+            PName familyName = null;
+            if (def.isPK() && !pkConstraint.getColumnNames().isEmpty() ) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_ALREADY_EXISTS)
+                        .setColumnName(columnName).build().buildException();
+            }
+            boolean isNull = def.isNull();
+            if (def.getColumnDefName().getFamilyName() != null) {
+                String family = def.getColumnDefName().getFamilyName();
+                if (isPK) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_WITH_FAMILY_NAME)
+                            .setColumnName(columnName).setFamilyName(family).build().buildException();
+                } else if (!def.isNull() && !isImmutableRows) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
+                            .setColumnName(columnName).setFamilyName(family).build().buildException();
+                }
+                familyName = PNameFactory.newName(family);
+            } else if (!isPK) {
+                familyName = PNameFactory.newName(defaultColumnFamily == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : defaultColumnFamily);
+            }
+
+            if (isPK && !addingToPK && pkConstraint.getColumnNames().size() <= 1) {
+                if (def.isNull() && def.isNullSet()) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_PK_MAY_NOT_BE_NULL)
+                            .setColumnName(columnName).build().buildException();
+                }
+                isNull = false;
+            }
+            PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
+                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(),
+                    null, false, def.getExpression(), isRowTimestamp,
+                    false, columnQualifierBytes, EnvironmentEdgeManager.currentTimeMillis());
+            return column;
+        } catch (IllegalArgumentException e) { // Based on precondition check in constructor
+            throw new SQLException(e);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 10f8287..292a0ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.transaction.TransactionFactory;
 
@@ -192,6 +193,11 @@ public class DelegateTable implements PTable {
     }
 
     @Override
+    public TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection) {
+        return delegate.getTransformMaintainer(oldTable, connection);
+    }
+
+    @Override
     public PName getDefaultFamilyName() {
         return delegate.getDefaultFamilyName();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 50d1904..732c718 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -109,6 +109,8 @@ import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
+import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
+import static org.apache.phoenix.schema.ColumnMetaDataOps.newColumn;
 import static org.apache.phoenix.schema.PTable.EncodedCQCounter.NULL_COUNTER;
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
@@ -122,6 +124,7 @@ import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
 import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -437,29 +440,6 @@ public class MetaDataClient {
                     INDEX_DISABLE_TIMESTAMP +","+
                     ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() +
                     ") VALUES (?, ?, ?, ?, ?, ?)";
-    private static final String INSERT_COLUMN_CREATE_TABLE =
-            "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
-                    TENANT_ID + "," +
-                    TABLE_SCHEM + "," +
-                    TABLE_NAME + "," +
-                    COLUMN_NAME + "," +
-                    COLUMN_FAMILY + "," +
-                    DATA_TYPE + "," +
-                    NULLABLE + "," +
-                    COLUMN_SIZE + "," +
-                    DECIMAL_DIGITS + "," +
-                    ORDINAL_POSITION + "," +
-                    SORT_ORDER + "," +
-                    DATA_TABLE_NAME + "," + // write this both in the column and table rows for access by metadata APIs
-                    ARRAY_SIZE + "," +
-                    VIEW_CONSTANT + "," +
-                    IS_VIEW_REFERENCED + "," +
-                    PK_NAME + "," +  // write this both in the column and table rows for access by metadata APIs
-                    KEY_SEQ + "," +
-                    COLUMN_DEF + "," +
-                    COLUMN_QUALIFIER + ", " +
-                    IS_ROW_TIMESTAMP +
-                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
     /*
      * Custom sql to add a column to SYSTEM.CATALOG table during upgrade.
@@ -941,57 +921,6 @@ public class MetaDataClient {
         return false;
     }
 
-    private void addColumnMutation(String schemaName, String tableName, PColumn column, PreparedStatement colUpsert, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
-        colUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
-        colUpsert.setString(2, schemaName);
-        colUpsert.setString(3, tableName);
-        colUpsert.setString(4, column.getName().getString());
-        colUpsert.setString(5, column.getFamilyName() == null ? null : column.getFamilyName().getString());
-        colUpsert.setInt(6, column.getDataType().getSqlType());
-        colUpsert.setInt(7, SchemaUtil.getIsNullableInt(column.isNullable()));
-        if (column.getMaxLength() == null) {
-            colUpsert.setNull(8, Types.INTEGER);
-        } else {
-            colUpsert.setInt(8, column.getMaxLength());
-        }
-        if (column.getScale() == null) {
-            colUpsert.setNull(9, Types.INTEGER);
-        } else {
-            colUpsert.setInt(9, column.getScale());
-        }
-        colUpsert.setInt(10, column.getPosition() + (isSalted ? 0 : 1));
-        colUpsert.setInt(11, column.getSortOrder().getSystemValue());
-        colUpsert.setString(12, parentTableName);
-        if (column.getArraySize() == null) {
-            colUpsert.setNull(13, Types.INTEGER);
-        } else {
-            colUpsert.setInt(13, column.getArraySize());
-        }
-        colUpsert.setBytes(14, column.getViewConstant());
-        colUpsert.setBoolean(15, column.isViewReferenced());
-        colUpsert.setString(16, pkName);
-        if (keySeq == null) {
-            colUpsert.setNull(17, Types.SMALLINT);
-        } else {
-            colUpsert.setShort(17, keySeq);
-        }
-        if (column.getExpressionStr() == null) {
-            colUpsert.setNull(18, Types.VARCHAR);
-        } else {
-            colUpsert.setString(18, column.getExpressionStr());
-        }
-        //Do not try to set extra columns when using ALTER_SYSCATALOG_TABLE_UPGRADE
-        if (colUpsert.getParameterMetaData().getParameterCount() > 18) {
-            if (column.getColumnQualifierBytes() == null) {
-                colUpsert.setNull(19, Types.VARBINARY);
-            } else {
-                colUpsert.setBytes(19, column.getColumnQualifierBytes());
-            }
-            colUpsert.setBoolean(20, column.isRowTimestamp());
-        }
-        colUpsert.execute();
-    }
-
 	private void addFunctionArgMutation(String functionName, FunctionArgument arg, PreparedStatement argUpsert, int position) throws SQLException {
         argUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
         argUpsert.setString(2, functionName);
@@ -1006,65 +935,6 @@ public class MetaDataClient {
         argUpsert.execute();
     }
 
-    private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily,
-            boolean addingToPK, byte[] columnQualifierBytes, boolean isImmutableRows) throws SQLException {
-        try {
-            ColumnName columnDefName = def.getColumnDefName();
-            SortOrder sortOrder = def.getSortOrder();
-            boolean isPK = def.isPK();
-            boolean isRowTimestamp = def.isRowTimestamp();
-            if (pkConstraint != null) {
-                Pair<ColumnName, SortOrder> pkSortOrder = pkConstraint.getColumnWithSortOrder(columnDefName);
-                if (pkSortOrder != null) {
-                    isPK = true;
-                    sortOrder = pkSortOrder.getSecond();
-                    isRowTimestamp = pkConstraint.isColumnRowTimestamp(columnDefName);
-                }
-            }
-            String columnName = columnDefName.getColumnName();
-            if (isPK && sortOrder == SortOrder.DESC && def.getDataType() == PVarbinary.INSTANCE) {
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.DESC_VARBINARY_NOT_SUPPORTED)
-                .setColumnName(columnName)
-                .build().buildException();
-            }
-
-            PName familyName = null;
-            if (def.isPK() && !pkConstraint.getColumnNames().isEmpty() ) {
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_ALREADY_EXISTS)
-                .setColumnName(columnName).build().buildException();
-            }
-            boolean isNull = def.isNull();
-            if (def.getColumnDefName().getFamilyName() != null) {
-                String family = def.getColumnDefName().getFamilyName();
-                if (isPK) {
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_WITH_FAMILY_NAME)
-                    .setColumnName(columnName).setFamilyName(family).build().buildException();
-                } else if (!def.isNull() && !isImmutableRows) {
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
-                    .setColumnName(columnName).setFamilyName(family).build().buildException();
-                }
-                familyName = PNameFactory.newName(family);
-            } else if (!isPK) {
-                familyName = PNameFactory.newName(defaultColumnFamily == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : defaultColumnFamily);
-            }
-
-            if (isPK && !addingToPK && pkConstraint.getColumnNames().size() <= 1) {
-                if (def.isNull() && def.isNullSet()) {
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_PK_MAY_NOT_BE_NULL)
-                    .setColumnName(columnName).build().buildException();
-                }
-                isNull = false;
-            }
-            PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
-                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(),
-                    null, false, def.getExpression(), isRowTimestamp,
-                    false, columnQualifierBytes, EnvironmentEdgeManager.currentTimeMillis());
-            return column;
-        } catch (IllegalArgumentException e) { // Based on precondition check in constructor
-            throw new SQLException(e);
-        }
-    }
-    
     public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType, PDataType viewIndexIdType, byte[][] viewColumnConstants, BitSet isViewColumnReferenced) throws SQLException {
         TableName tableName = statement.getTableName();
         Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize(statement.getProps().size());
@@ -2632,46 +2502,49 @@ public class MetaDataClient {
                 } else {
                     encodingScheme = getEncodingScheme(tableProps, schemaName, tableName, transactionProvider);
 
-                    if (isImmutableRows) {
-                        ImmutableStorageScheme immutableStorageSchemeProp =
-                                (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME
-                                .getValue(tableProps);
-                        if (immutableStorageSchemeProp == null) {
-                            // Ignore default if transactional and column encoding is not supported
-                            if (transactionProvider == null || 
-                                    !transactionProvider.getTransactionProvider().isUnsupported(
-                                            PhoenixTransactionProvider.Feature.COLUMN_ENCODING) ) {
-                                if (multiTenant) {
+                    ImmutableStorageScheme immutableStorageSchemeProp =
+                            (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME
+                                    .getValue(tableProps);
+                    if (immutableStorageSchemeProp == null) {
+                        // Ignore default if transactional and column encoding is not supported
+                        if (transactionProvider == null ||
+                                !transactionProvider.getTransactionProvider().isUnsupported(
+                                        PhoenixTransactionProvider.Feature.COLUMN_ENCODING)) {
+                            if (multiTenant) {
+                                immutableStorageScheme =
+                                        ImmutableStorageScheme
+                                                .valueOf(connection
+                                                        .getQueryServices()
+                                                        .getProps()
+                                                        .get(
+                                                                QueryServices.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
+                                                                QueryServicesOptions.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME));
+                            } else {
+                                if (isImmutableRows) {
                                     immutableStorageScheme =
                                             ImmutableStorageScheme
-                                            .valueOf(connection
-                                                    .getQueryServices()
-                                                    .getProps()
-                                                    .get(
-                                                            QueryServices.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
-                                                            QueryServicesOptions.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME));
+                                                    .valueOf(connection
+                                                            .getQueryServices()
+                                                            .getProps()
+                                                            .get(
+                                                                    QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
+                                                                    QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME));
                                 } else {
-                                    immutableStorageScheme =
-                                            ImmutableStorageScheme
-                                            .valueOf(connection
-                                                    .getQueryServices()
-                                                    .getProps()
-                                                    .get(
-                                                            QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
-                                                            QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME));
+                                    immutableStorageScheme = ONE_CELL_PER_COLUMN;
                                 }
                             }
-                        } else {
-                            immutableStorageScheme = getImmutableStorageSchemeForIndex(immutableStorageSchemeProp, schemaName, tableName, transactionProvider);
                         }
-                        if (immutableStorageScheme != ONE_CELL_PER_COLUMN
-                                && encodingScheme == NON_ENCODED_QUALIFIERS) {
-                            throw new SQLExceptionInfo.Builder(
-                                    SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES)
-                            .setSchemaName(schemaName).setTableName(tableName).build()
-                            .buildException();
-                        }
-                    } 
+                    } else {
+                        immutableStorageScheme = getImmutableStorageSchemeForIndex(immutableStorageSchemeProp, schemaName, tableName, transactionProvider);
+                    }
+                    if (immutableStorageScheme != ONE_CELL_PER_COLUMN
+                            && encodingScheme == NON_ENCODED_QUALIFIERS) {
+                        getEncodingScheme(tableProps, schemaName, tableName, transactionProvider);
+                        throw new SQLExceptionInfo.Builder(
+                                SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES)
+                                .setSchemaName(schemaName).setTableName(tableName).build()
+                                .buildException();
+                    }
                 }
                 cqCounter = encodingScheme != NON_ENCODED_QUALIFIERS ? new EncodedCQCounter() : NULL_COUNTER;
             }
@@ -2904,88 +2777,87 @@ public class MetaDataClient {
 
             List<Mutation> columnMetadata = Lists.newArrayListWithExpectedSize(columns.size());
             boolean isRegularView = (tableType == PTableType.VIEW && viewType!=ViewType.MAPPED);
-            try (PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN_CREATE_TABLE)) {
-                for (Map.Entry<PColumn, PColumn> entry : columns.entrySet()) {
-                    PColumn column = entry.getValue();
-                    final int columnPosition = column.getPosition();
-                    // For client-side cache, we need to update the column
-                    // set the autoPartition column attributes
-                    if (parent != null && parent.getAutoPartitionSeqName() != null
-                            && parent.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parent)).equals(column)) {
+            for (Map.Entry<PColumn, PColumn> entry : columns.entrySet()) {
+                PColumn column = entry.getValue();
+                final int columnPosition = column.getPosition();
+                // For client-side cache, we need to update the column
+                // set the autoPartition column attributes
+                if (parent != null && parent.getAutoPartitionSeqName() != null
+                        && parent.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parent)).equals(column)) {
+                    entry.setValue(column = new DelegateColumn(column) {
+                        @Override
+                        public byte[] getViewConstant() {
+                            // set to non-null value so that we will generate a Put that
+                            // will be set correctly on the server
+                            return QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
+                        }
+
+                        @Override
+                        public boolean isViewReferenced() {
+                            return true;
+                        }
+                    });
+                } else if (isViewColumnReferenced != null) {
+                    if (viewColumnConstants != null && columnPosition < viewColumnConstants.length) {
                         entry.setValue(column = new DelegateColumn(column) {
                             @Override
                             public byte[] getViewConstant() {
-                                // set to non-null value so that we will generate a Put that
-                                // will be set correctly on the server
-                                return QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
+                                return viewColumnConstants[columnPosition];
                             }
 
                             @Override
                             public boolean isViewReferenced() {
-                                return true;
+                                return isViewColumnReferenced.get(columnPosition);
                             }
                         });
-                    } else if (isViewColumnReferenced != null) {
-                        if (viewColumnConstants != null && columnPosition < viewColumnConstants.length) {
-                            entry.setValue(column = new DelegateColumn(column) {
-                                @Override
-                                public byte[] getViewConstant() {
-                                    return viewColumnConstants[columnPosition];
-                                }
-                                
-                                @Override
-                                public boolean isViewReferenced() {
-                                    return isViewColumnReferenced.get(columnPosition);
-                                }
-                            });
-                        } else {
-                            entry.setValue(column = new DelegateColumn(column) {
-                                @Override
-                                public boolean isViewReferenced() {
-                                    return isViewColumnReferenced.get(columnPosition);
-                                }
-                            });
-                        }
+                    } else {
+                        entry.setValue(column = new DelegateColumn(column) {
+                            @Override
+                            public boolean isViewReferenced() {
+                                return isViewColumnReferenced.get(columnPosition);
+                            }
+                        });
+                    }
 
-                        // if the base table column is referenced in the view
-                        // or if we are adding a new column during view creation
-                        if (isViewColumnReferenced.get(columnPosition) ||
-                                viewNewColumnPositions.contains(
-                                        columnPosition)) {
-                            // acquire the mutex using the global physical table
-                            // name to prevent this column from being dropped
-                            // while the view is being created or to prevent
-                            // a conflicting column from being added to a parent
-                            // in case the view creation adds new columns
-                            boolean acquiredMutex = writeCell(
-                                    null,
+                    // if the base table column is referenced in the view
+                    // or if we are adding a new column during view creation
+                    if (isViewColumnReferenced.get(columnPosition) ||
+                            viewNewColumnPositions.contains(
+                                    columnPosition)) {
+                        // acquire the mutex using the global physical table
+                        // name to prevent this column from being dropped
+                        // while the view is being created or to prevent
+                        // a conflicting column from being added to a parent
+                        // in case the view creation adds new columns
+                        boolean acquiredMutex = writeCell(
+                                null,
+                                parentPhysicalSchemaName,
+                                parentPhysicalTableName,
+                                column.toString());
+                        if (!acquiredMutex) {
+                            throw new ConcurrentTableMutationException(
                                     parentPhysicalSchemaName,
-                                    parentPhysicalTableName,
-                                    column.toString());
-                            if (!acquiredMutex) {
-                                throw new ConcurrentTableMutationException(
-                                        parentPhysicalSchemaName,
-                                        parentPhysicalTableName);
-                            }
-                            acquiredColumnMutexSet.add(column.toString());
+                                    parentPhysicalTableName);
                         }
+                        acquiredColumnMutexSet.add(column.toString());
                     }
-                    Short keySeq = SchemaUtil.isPKColumn(column) ? ++nextKeySeq : null;
-                    // Prior to PHOENIX-3534 we were sending the parent table column metadata while creating a
-                    // child view, now that we combine columns by resolving the parent table hierarchy we
-                    // don't need to include the parent table columns while creating a view
-                    // If QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK is true we continue
-                    // to store the parent table column metadata along with the child view metadata
-                    // so that we can rollback the upgrade if required.
-                    if (allowSystemCatalogRollback || !isRegularView
-                            || columnPosition >= baseTableColumnCount) {
-                        addColumnMutation(schemaName, tableName, column, colUpsert, parentTableName,
+                }
+                Short keySeq = SchemaUtil.isPKColumn(column) ? ++nextKeySeq : null;
+                // Prior to PHOENIX-3534 we were sending the parent table column metadata while creating a
+                // child view, now that we combine columns by resolving the parent table hierarchy we
+                // don't need to include the parent table columns while creating a view
+                // If QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK is true we continue
+                // to store the parent table column metadata along with the child view metadata
+                // so that we can rollback the upgrade if required.
+                if (allowSystemCatalogRollback || !isRegularView
+                        || columnPosition >= baseTableColumnCount) {
+                    addColumnMutation(connection, schemaName, tableName, column, parentTableName,
                             pkName, keySeq, saltBucketNum != null);
-                        columnMetadata.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
-                        connection.rollback();
-                    }
+                    columnMetadata.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
+                    connection.rollback();
                 }
             }
+
             // add the columns in reverse order since we reverse the list later
             Collections.reverse(columnMetadata);
             tableMetaData.addAll(columnMetadata);
@@ -3730,57 +3602,72 @@ public class MetaDataClient {
             tableUpsert.close();
         }
         if (isImmutableRows != null) {
-            mutateBooleanProperty(tenantId, schemaName, tableName, IMMUTABLE_ROWS, isImmutableRows);
+            mutateBooleanProperty(connection, tenantId, schemaName, tableName, IMMUTABLE_ROWS, isImmutableRows);
         }
         if (disableWAL != null) {
-            mutateBooleanProperty(tenantId, schemaName, tableName, DISABLE_WAL, disableWAL);
+            mutateBooleanProperty(connection,tenantId, schemaName, tableName, DISABLE_WAL, disableWAL);
         }
         if (isMultiTenant != null) {
-            mutateBooleanProperty(tenantId, schemaName, tableName, MULTI_TENANT, isMultiTenant);
+            mutateBooleanProperty(connection,tenantId, schemaName, tableName, MULTI_TENANT, isMultiTenant);
         }
         if (storeNulls != null) {
-            mutateBooleanProperty(tenantId, schemaName, tableName, STORE_NULLS, storeNulls);
+            mutateBooleanProperty(connection,tenantId, schemaName, tableName, STORE_NULLS, storeNulls);
         }
         if (isTransactional != null) {
-            mutateBooleanProperty(tenantId, schemaName, tableName, TRANSACTIONAL, isTransactional);
+            mutateBooleanProperty(connection,tenantId, schemaName, tableName, TRANSACTIONAL, isTransactional);
         }
         if (transactionProvider !=null) {
-            mutateByteProperty(tenantId, schemaName, tableName, TRANSACTION_PROVIDER, transactionProvider.getCode());
+            mutateByteProperty(connection, tenantId, schemaName, tableName, TRANSACTION_PROVIDER, transactionProvider.getCode());
         }
         if (updateCacheFrequency != null) {
-            mutateLongProperty(tenantId, schemaName, tableName, UPDATE_CACHE_FREQUENCY, updateCacheFrequency);
+            mutateLongProperty(connection,tenantId, schemaName, tableName, UPDATE_CACHE_FREQUENCY, updateCacheFrequency);
         }
         if (guidePostWidth == null || guidePostWidth >= 0) {
-            mutateLongProperty(tenantId, schemaName, tableName, GUIDE_POSTS_WIDTH, guidePostWidth);
+            mutateLongProperty(connection, tenantId, schemaName, tableName, GUIDE_POSTS_WIDTH, guidePostWidth);
         }
         if (appendOnlySchema !=null) {
-            mutateBooleanProperty(tenantId, schemaName, tableName, APPEND_ONLY_SCHEMA, appendOnlySchema);
+            mutateBooleanProperty(connection, tenantId, schemaName, tableName, APPEND_ONLY_SCHEMA, appendOnlySchema);
         }
         if (columnEncodedBytes !=null) {
-            mutateByteProperty(tenantId, schemaName, tableName, ENCODING_SCHEME, columnEncodedBytes.getSerializedMetadataValue());
+            mutateByteProperty(connection, tenantId, schemaName, tableName, ENCODING_SCHEME, columnEncodedBytes.getSerializedMetadataValue());
         }
         if (immutableStorageScheme !=null) {
-            mutateStringProperty(tenantId, schemaName, tableName, IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.name());
+            mutateStringProperty(connection, tenantId, schemaName, tableName, IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.name());
         }
         if (useStatsForParallelization != null) {
-            mutateBooleanProperty(tenantId, schemaName, tableName, USE_STATS_FOR_PARALLELIZATION, useStatsForParallelization);
+            mutateBooleanProperty(connection, tenantId, schemaName, tableName, USE_STATS_FOR_PARALLELIZATION, useStatsForParallelization);
         }
         if (phoenixTTL != null) {
-            mutateLongProperty(tenantId, schemaName, tableName, PHOENIX_TTL, phoenixTTL);
+            mutateLongProperty(connection, tenantId, schemaName, tableName, PHOENIX_TTL, phoenixTTL);
         }
         if (isChangeDetectionEnabled != null) {
-            mutateBooleanProperty(tenantId, schemaName, tableName, CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
+            mutateBooleanProperty(connection, tenantId, schemaName, tableName, CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
         }
         if (!Strings.isNullOrEmpty(physicalTableName)) {
-            mutateStringProperty(tenantId, schemaName, tableName, PHYSICAL_TABLE_NAME, physicalTableName);
+            mutateStringProperty(connection, tenantId, schemaName, tableName, PHYSICAL_TABLE_NAME, physicalTableName);
         }
         if (!Strings.isNullOrEmpty(schemaVersion)) {
-            mutateStringProperty(tenantId, schemaName, tableName, SCHEMA_VERSION, schemaVersion);
+            mutateStringProperty(connection, tenantId, schemaName, tableName, SCHEMA_VERSION, schemaVersion);
         }
         return seqNum;
     }
 
-    private void mutateBooleanProperty(String tenantId, String schemaName, String tableName,
+    public static void mutateTransformProperties(Connection connection, String tenantId, String schemaName, String tableName,
+                                                 String physicalTableName,
+                                                 ImmutableStorageScheme immutableStorageScheme,
+                                                 QualifierEncodingScheme columnEncodedBytes) throws SQLException {
+        if (columnEncodedBytes !=null) {
+            mutateByteProperty(connection, tenantId, schemaName, tableName, ENCODING_SCHEME, columnEncodedBytes.getSerializedMetadataValue());
+        }
+        if (immutableStorageScheme !=null) {
+            mutateByteProperty(connection, tenantId, schemaName, tableName, IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.getSerializedMetadataValue());
+        }
+        if (!Strings.isNullOrEmpty(physicalTableName)) {
+            mutateStringProperty(connection, tenantId, schemaName, tableName, PHYSICAL_TABLE_NAME, physicalTableName);
+        }
+    }
+
+    private static void mutateBooleanProperty(Connection connection, String tenantId, String schemaName, String tableName,
             String propertyName, boolean propertyValue) throws SQLException {
         String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
                 TENANT_ID + "," +
@@ -3797,7 +3684,7 @@ public class MetaDataClient {
         }
     }
 
-    private void mutateLongProperty(String tenantId, String schemaName, String tableName,
+    private static void mutateLongProperty(Connection connection, String tenantId, String schemaName, String tableName,
             String propertyName, Long propertyValue) throws SQLException {
         String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
                 TENANT_ID + "," +
@@ -3818,7 +3705,7 @@ public class MetaDataClient {
         }
     }
     
-    private void mutateByteProperty(String tenantId, String schemaName, String tableName,
+    private static void mutateByteProperty(Connection connection, String tenantId, String schemaName, String tableName,
             String propertyName, Byte propertyValue) throws SQLException {
         String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
                 TENANT_ID + "," +
@@ -3839,7 +3726,7 @@ public class MetaDataClient {
         }
     }
     
-    private void mutateStringProperty(String tenantId, String schemaName, String tableName,
+    private static void mutateStringProperty(Connection connection, String tenantId, String schemaName, String tableName,
             String propertyName, String propertyValue) throws SQLException {
         String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
                 TENANT_ID + "," +
@@ -3996,123 +3883,117 @@ public class MetaDataClient {
                 Map<String, Integer> changedCqCounters = new HashMap<>(numCols);
                 if (numCols > 0 ) {
                     StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver);
-                    String addColumnSqlToUse = connection.isRunningUpgrade()
-                            && tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE)
-                            && schemaName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA) ? ALTER_SYSCATALOG_TABLE_UPGRADE
-                            : INSERT_COLUMN_CREATE_TABLE;
-                    try (PreparedStatement colUpsert = connection.prepareStatement(addColumnSqlToUse)) {
-                        short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
-                        for ( ColumnDef colDef : columnDefs) {
-                            if (colDef != null && !colDef.isNull()) {
-                                if (colDef.isPK()) {
-                                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY)
-                                    .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
-                                } else if (!willBeImmutableRows) {
-                                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
-                                    .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
-                                }
-                            }
-                            if (colDef != null && colDef.isPK() && table.getType() == VIEW && table.getViewType() != MAPPED) {
-                                throwIfLastPKOfParentIsVariableLength(getParentOfView(table), schemaName, tableName, colDef);
-                            }
-                            if (colDef != null && colDef.isRowTimestamp()) {
-                                throw new SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY)
+                    short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
+                    for ( ColumnDef colDef : columnDefs) {
+                        if (colDef != null && !colDef.isNull()) {
+                            if (colDef.isPK()) {
+                                throw new SQLExceptionInfo.Builder(SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY)
+                                .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
+                            } else if (!willBeImmutableRows) {
+                                throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
                                 .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                             }
-                            if (!colDef.validateDefault(context, null)) {
-                                colDef = new ColumnDef(colDef, null); // Remove DEFAULT as it's not necessary
-                            }
-                            String familyName = null;
-                            Integer encodedCQ = null;
-                            if (!colDef.isPK()) {
-                                String colDefFamily = colDef.getColumnDefName().getFamilyName();
-                                ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
-                                String defaultColumnFamily = tableForCQCounters.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(tableForCQCounters.getDefaultFamilyName().getString()) ? 
-                                        tableForCQCounters.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
-                                    if (table.getType() == PTableType.INDEX && table.getIndexType() == IndexType.LOCAL) {
-                                        defaultColumnFamily = QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + defaultColumnFamily;
-                                    }
-                                if (storageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-                                    familyName = colDefFamily != null ? colDefFamily : defaultColumnFamily;
-                                } else {
-                                    familyName = defaultColumnFamily;
-                                }
-                                encodedCQ = table.isAppendOnlySchema() ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + position) : cqCounterToUse.getNextQualifier(familyName);
-                                if (!table.isAppendOnlySchema() && cqCounterToUse.increment(familyName)) {
-                                    changedCqCounters.put(familyName,
-                                        cqCounterToUse.getNextQualifier(familyName));
+                        }
+                        if (colDef != null && colDef.isPK() && table.getType() == VIEW && table.getViewType() != MAPPED) {
+                            throwIfLastPKOfParentIsVariableLength(getParentOfView(table), schemaName, tableName, colDef);
+                        }
+                        if (colDef != null && colDef.isRowTimestamp()) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY)
+                            .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
+                        }
+                        if (!colDef.validateDefault(context, null)) {
+                            colDef = new ColumnDef(colDef, null); // Remove DEFAULT as it's not necessary
+                        }
+                        String familyName = null;
+                        Integer encodedCQ = null;
+                        if (!colDef.isPK()) {
+                            String colDefFamily = colDef.getColumnDefName().getFamilyName();
+                            ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
+                            String defaultColumnFamily = tableForCQCounters.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(tableForCQCounters.getDefaultFamilyName().getString()) ?
+                                    tableForCQCounters.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
+                                if (table.getType() == PTableType.INDEX && table.getIndexType() == IndexType.LOCAL) {
+                                    defaultColumnFamily = QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + defaultColumnFamily;
                                 }
+                            if (storageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                                familyName = colDefFamily != null ? colDefFamily : defaultColumnFamily;
+                            } else {
+                                familyName = defaultColumnFamily;
                             }
-                            byte[] columnQualifierBytes = null;
-                            try {
-                                columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(), encodedCQ, table, colDef.isPK());
-                            }
-                            catch (QualifierOutOfRangeException e) {
-                                throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
-                                .setSchemaName(schemaName)
-                                .setTableName(tableName).build().buildException();
-                            }
-                            PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, columnQualifierBytes, willBeImmutableRows);
-                            HashMap<PTable, PColumn> indexToIndexColumnMap = null;
-                            if (cascade) {
-                                indexToIndexColumnMap = getPTablePColumnHashMapForCascade(indexesPTable, willBeImmutableRows,
-                                                colDef, familyName, indexToColumnSizeMap);
+                            encodedCQ = table.isAppendOnlySchema() ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + position) : cqCounterToUse.getNextQualifier(familyName);
+                            if (!table.isAppendOnlySchema() && cqCounterToUse.increment(familyName)) {
+                                changedCqCounters.put(familyName,
+                                    cqCounterToUse.getNextQualifier(familyName));
                             }
+                        }
+                        byte[] columnQualifierBytes = null;
+                        try {
+                            columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(), encodedCQ, table, colDef.isPK());
+                        }
+                        catch (QualifierOutOfRangeException e) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
+                            .setSchemaName(schemaName)
+                            .setTableName(tableName).build().buildException();
+                        }
+                        PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, columnQualifierBytes, willBeImmutableRows);
+                        HashMap<PTable, PColumn> indexToIndexColumnMap = null;
+                        if (cascade) {
+                            indexToIndexColumnMap = getPTablePColumnHashMapForCascade(indexesPTable, willBeImmutableRows,
+                                            colDef, familyName, indexToColumnSizeMap);
+                        }
 
-                            columns.add(column);
-                            String pkName = null;
-                            Short keySeq = null;
-                            
-                            // TODO: support setting properties on other families?
-                            if (column.getFamilyName() == null) {
-                                ++numPkColumnsAdded;
-                                pkName = table.getPKName() == null ? null : table.getPKName().getString();
-                                keySeq = ++nextKeySeq;
-                            } else {
-                                families.add(column.getFamilyName().getString());
+                        columns.add(column);
+                        String pkName = null;
+                        Short keySeq = null;
+
+                        // TODO: support setting properties on other families?
+                        if (column.getFamilyName() == null) {
+                            ++numPkColumnsAdded;
+                            pkName = table.getPKName() == null ? null : table.getPKName().getString();
+                            keySeq = ++nextKeySeq;
+                        } else {
+                            families.add(column.getFamilyName().getString());
+                        }
+                        colFamiliesForPColumnsToBeAdded.add(column.getFamilyName() == null ? null : column.getFamilyName().getString());
+                        addColumnMutation(connection, schemaName, tableName, column, null, pkName, keySeq, table.getBucketNum() != null);
+                        // add new columns for given indexes one by one
+                        if (cascade) {
+                            for (PTable index: indexesPTable) {
+                                LOGGER.info("Adding column "+column.getName().getString()+" to "+index.getTableName().toString());
+                                addColumnMutation(connection, schemaName, index.getTableName().getString(), indexToIndexColumnMap.get(index), null, "", keySeq, index.getBucketNum() != null);
                             }
-                            colFamiliesForPColumnsToBeAdded.add(column.getFamilyName() == null ? null : column.getFamilyName().getString());
-                            addColumnMutation(schemaName, tableName, column, colUpsert, null, pkName, keySeq, table.getBucketNum() != null);
-                            // add new columns for given indexes one by one
-                            if (cascade) {
-                                for (PTable index: indexesPTable) {
-                                    LOGGER.info("Adding column "+column.getName().getString()+" to "+index.getTableName().toString());
-                                    addColumnMutation(schemaName, index.getTableName().getString(), indexToIndexColumnMap.get(index), colUpsert, null, "", keySeq, index.getBucketNum() != null);
-                                }
+                        }
+                    }
+
+                    // Add any new PK columns to end of index PK
+                    if (numPkColumnsAdded > 0) {
+                        // create PK column list that includes the newly created columns
+                        List<PColumn> pkColumns = Lists.newArrayListWithExpectedSize(table.getPKColumns().size()+numPkColumnsAdded);
+                        pkColumns.addAll(table.getPKColumns());
+                        for (int i=0; i<numCols; ++i) {
+                            if (columnDefs.get(i).isPK()) {
+                                pkColumns.add(columns.get(i));
                             }
                         }
-                        
-                        // Add any new PK columns to end of index PK
-                        if (numPkColumnsAdded > 0) {
-                            // create PK column list that includes the newly created columns
-                            List<PColumn> pkColumns = Lists.newArrayListWithExpectedSize(table.getPKColumns().size()+numPkColumnsAdded);
-                            pkColumns.addAll(table.getPKColumns());
+                        int pkSlotPosition = table.getPKColumns().size()-1;
+                        for (PTable index : table.getIndexes()) {
+                            short nextIndexKeySeq = SchemaUtil.getMaxKeySeq(index);
+                            int indexPosition = index.getColumns().size();
                             for (int i=0; i<numCols; ++i) {
-                                if (columnDefs.get(i).isPK()) {
-                                    pkColumns.add(columns.get(i));
-                                }
-                            }
-                            int pkSlotPosition = table.getPKColumns().size()-1;
-                            for (PTable index : table.getIndexes()) {
-                                short nextIndexKeySeq = SchemaUtil.getMaxKeySeq(index);
-                                int indexPosition = index.getColumns().size();
-                                for (int i=0; i<numCols; ++i) {
-                                    ColumnDef colDef = columnDefs.get(i);
-                                    if (colDef.isPK()) {
-                                        PDataType indexColDataType = IndexUtil.getIndexColumnDataType(colDef.isNull(), colDef.getDataType());
-                                        ColumnName indexColName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, colDef.getColumnDefName().getColumnName()));
-                                        Expression expression = new RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, pkSlotPosition));
-                                        ColumnDef indexColDef = FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp());
-                                        PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, null, willBeImmutableRows);
-                                        addColumnMutation(schemaName, index.getTableName().getString(), indexColumn, colUpsert, index.getParentTableName().getString(), index.getPKName() == null ? null : index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null);
-                                    }
+                                ColumnDef colDef = columnDefs.get(i);
+                                if (colDef.isPK()) {
+                                    PDataType indexColDataType = IndexUtil.getIndexColumnDataType(colDef.isNull(), colDef.getDataType());
+                                    ColumnName indexColName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, colDef.getColumnDefName().getColumnName()));
+                                    Expression expression = new RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, pkSlotPosition));
+                                    ColumnDef indexColDef = FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp());
+                                    PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, null, willBeImmutableRows);
+                                    addColumnMutation(connection, schemaName, index.getTableName().getString(), indexColumn, index.getParentTableName().getString(), index.getPKName() == null ? null : index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null);
                                 }
                             }
-                            ++pkSlotPosition;
                         }
-                        columnMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
-                        connection.rollback();
+                        ++pkSlotPosition;
                     }
+                    columnMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
+                    connection.rollback();
                 } else {
                     // Check that HBase configured properly for mutable secondary indexing
                     // if we're changing from an immutable table to a mutable table and we
@@ -5553,10 +5434,9 @@ public class MetaDataClient {
             }
         }
 
-        if (!Strings.isNullOrEmpty(metaProperties.getPhysicalTableName())) {
-            if (!metaProperties.getPhysicalTableName().
-                    equals(table.getPhysicalName(true).getString())) {
-                metaPropertiesEvaluated.setPhysicalTableName(metaProperties.getPhysicalTableName());
+        if (!Strings.isNullOrEmpty(metaProperties.getPhysicalTableNameProp())) {
+            if (!metaProperties.getPhysicalTableNameProp().equals(table.getPhysicalName(true))) {
+                metaPropertiesEvaluated.setPhysicalTableName(metaProperties.getPhysicalTableNameProp());
                 changingPhoenixTableProperty = true;
             }
         }
@@ -5639,6 +5519,10 @@ public class MetaDataClient {
             this.isTransactionalProp = isTransactionalProp;
         }
 
+        public String getPhysicalTableNameProp() {
+            return this.physicalTableNameProp;
+        }
+
         public void setPhysicalTableNameProp(String physicalTableNameProp) {
             this.physicalTableNameProp = physicalTableNameProp;
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index ea272ce..0eb5d26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PArrayDataTypeDecoder;
 import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
@@ -248,7 +249,8 @@ public interface PTable extends PMetaDataEntity {
     }
 
     public enum TransformType {
-        METADATA_TRANSFORM((byte)1);
+        METADATA_TRANSFORM((byte)1),
+        METADATA_TRANSFORM_PARTIAL((byte)2);
 
         private final byte[] byteValue;
         private final int serializedValue;
@@ -297,6 +299,11 @@ public interface PTable extends PMetaDataEntity {
                 return  "FAILED";
             }
         },
+        PAUSED {
+            public String toString() {
+                return  "PAUSED";
+            }
+        },
     }
 
     public enum ImmutableStorageScheme implements ColumnValueEncoderDecoderSupplier {
@@ -818,6 +825,7 @@ public interface PTable extends PMetaDataEntity {
 
     boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection);
     IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection);
+    TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection);
     PName getDefaultFamilyName();
 
     boolean isWALDisabled();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 70efc26..97605d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -97,6 +97,7 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
@@ -138,6 +139,7 @@ public class PTableImpl implements PTable {
     private static final int VIEW_MODIFIED_PHOENIX_TTL_BIT_SET_POS = 2;
 
     private IndexMaintainer indexMaintainer;
+    private TransformMaintainer transformMaintainer;
     private ImmutableBytesWritable indexMaintainersPtr;
 
     private final PTableKey key;
@@ -1657,6 +1659,14 @@ public class PTableImpl implements PTable {
     }
 
     @Override
+    public synchronized TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection) {
+        if (transformMaintainer == null) {
+            transformMaintainer = TransformMaintainer.create(oldTable, this, connection);
+        }
+        return transformMaintainer;
+    }
+
+    @Override
     public synchronized IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection) {
         if (indexMaintainer == null) {
             indexMaintainer = IndexMaintainer.create(dataTable, this, connection);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
index 656bd31..8e400b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.ArrayList;
 
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
 import static org.apache.phoenix.util.MetaDataUtil.SYNCED_DATA_TABLE_AND_INDEX_COL_FAM_PROPERTIES;
 
 public class SchemaExtractionProcessor implements SchemaProcessor {
@@ -117,7 +118,8 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
             HTableDescriptor htd = getTableDescriptor(cqsi, table);
             setHTableProperties(htd);
         }
-        String propertiesString = convertPropertiesToString();
+
+        String propertiesString = convertPropertiesToString(true);
         return generateIndexDDLString(quotedBaseTableFullName, indexedColumnsString, coveredColumnsString,
                 indexPTable.getIndexType().equals(PTable.IndexType.LOCAL), quotedIndexTableName, propertiesString);
     }
@@ -275,7 +277,7 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
         setHColumnFamilyProperties(hcds);
 
         String columnInfoString = getColumnInfoStringForTable(table);
-        String propertiesString = convertPropertiesToString();
+        String propertiesString = convertPropertiesToString(false);
 
         return generateTableDDLString(columnInfoString, propertiesString, pSchemaName, pTableName);
     }
@@ -367,7 +369,7 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
         }
     }
 
-    private String convertPropertiesToString() {
+    private String convertPropertiesToString(boolean forIndex) {
         StringBuilder optionBuilder = new StringBuilder();
         for(Map.Entry<String, String> entry : definedProps.entrySet()) {
             String key = entry.getKey();
@@ -381,6 +383,12 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
             }
 
             if(value!=null && (shouldGenerateWithDefaults || (defaultProps.get(key) != null && !value.equals(defaultProps.get(key))))) {
+                if (forIndex) {
+                    // cannot set these for index
+                    if (key.equals(UPDATE_CACHE_FREQUENCY)) {
+                        continue;
+                    }
+                }
                 if (optionBuilder.length() != 0) {
                     optionBuilder.append(", ");
                 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
index c2cbf19..dd678ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
@@ -152,6 +152,27 @@ public class SystemTransformRecord {
         private String newMetadata;
         private String transformFunction;
 
+        public SystemTransformBuilder() {
+
+        }
+
+        public SystemTransformBuilder(SystemTransformRecord systemTransformRecord) {
+            this.setTransformType(systemTransformRecord.getTransformType());
+            this.setTenantId(systemTransformRecord.getTenantId());
+            this.setSchemaName(systemTransformRecord.getSchemaName());
+            this.setLogicalTableName(systemTransformRecord.getLogicalTableName());
+            this.setNewPhysicalTableName(systemTransformRecord.getNewPhysicalTableName());
+            this.setLogicalParentName(systemTransformRecord.getLogicalParentName());
+            this.setTransformStatus(systemTransformRecord.getTransformStatus());
+            this.setTransformJobId(systemTransformRecord.getTransformJobId());
+            this.setTransformRetryCount(systemTransformRecord.getTransformRetryCount());
+            this.setStartTs(systemTransformRecord.getTransformStartTs());
+            this.setEndTs(systemTransformRecord.getTransformEndTs());
+            this.setOldMetadata(systemTransformRecord.getOldMetadata());
+            this.setNewMetadata(systemTransformRecord.getNewMetadata());
+            this.setTransformFunction(systemTransformRecord.getTransformFunction());
+        }
+
         public SystemTransformBuilder setTransformType(PTable.TransformType transformType) {
             this.transformType = transformType;
             return this;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index d3f016e..56f0545 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -19,22 +19,39 @@ package org.apache.phoenix.schema.transform;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.sql.Types;
 
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
+import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 
 public class Transform {
@@ -46,9 +63,8 @@ public class Transform {
         return newName;
     }
 
-    public static void addTransform(PhoenixConnection connection, String tenantId, PTable table,
-                                    MetaDataClient.MetaProperties changingProperties, long sequenceNum,
-                                    PTable.TransformType transformType) throws SQLException {
+    public static void addTransform(PhoenixConnection connection, String tenantId, PTable table, MetaDataClient.MetaProperties changingProperties,
+                                    long sequenceNum, PTable.TransformType transformType) throws SQLException {
         try {
             String newMetadata = JacksonUtil.getObjectWriter().writeValueAsString(changingProperties);
             String oldMetadata = "";
@@ -71,14 +87,148 @@ public class Transform {
                 newPhysicalTableName = generateNewTableName(schema, logicalTableName, sequenceNum);
             }
             transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
-            Transform.addTransform(transformBuilder.build(), connection);
+            Transform.addTransform(table, changingProperties, transformBuilder.build(), connection);
         } catch (JsonProcessingException ex) {
             LOGGER.error("addTransform failed", ex);
             throw new SQLException("Adding transform failed with JsonProcessingException");
+        } catch (SQLException ex) {
+            throw ex;
+        } catch(Exception ex) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.valueOf("CANNOT_MUTATE_TABLE"))
+                    .setSchemaName((table.getSchemaName() == null? null: table.getSchemaName().getString()))
+                    .setRootCause(ex)
+                    .setTableName(table.getName().getString()).build().buildException();
         }
     }
 
-    public static void addTransform(
+    protected static void addTransform(
+            PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws Exception {
+        PName newTableName = PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
+        PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
+        PTable newTable = new PTableImpl.Builder()
+                .setTableName(newTableNameWithoutSchema)
+                .setParentTableName(table.getParentTableName())
+                .setBaseTableLogicalName(table.getBaseTableLogicalName())
+                .setPhysicalTableName(newTableNameWithoutSchema)
+                .setAllColumns(table.getColumns())
+                .setAppendOnlySchema(table.isAppendOnlySchema())
+                .setAutoPartitionSeqName(table.getAutoPartitionSeqName())
+                .setBaseColumnCount(table.getBaseColumnCount())
+                .setBucketNum(table.getBucketNum())
+                .setDefaultFamilyName(table.getDefaultFamilyName())
+                .setDisableWAL(table.isWALDisabled())
+                .setEstimatedSize(table.getEstimatedSize())
+                .setFamilies(table.getColumnFamilies())
+                .setImmutableRows(table.isImmutableRows())
+                .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+                .setIndexType(table.getIndexType())
+                .setName(newTableName)
+                .setMultiTenant(table.isMultiTenant())
+                .setParentName(table.getParentName())
+                .setParentSchemaName(table.getParentSchemaName())
+                .setPhoenixTTL(table.getPhoenixTTL())
+                .setNamespaceMapped(table.isNamespaceMapped())
+                .setSchemaName(table.getSchemaName())
+                .setPkColumns(table.getPKColumns())
+                .setPkName(table.getPKName())
+                .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
+                .setRowKeySchema(table.getRowKeySchema())
+                .setStoreNulls(table.getStoreNulls())
+                .setTenantId(table.getTenantId())
+                .setType(table.getType())
+                // SchemaExtractor uses physical name to get the table descriptor from. So we use the existing table here
+                .setPhysicalNames(ImmutableList.copyOf(table.getPhysicalNames()))
+                .setUpdateCacheFrequency(table.getUpdateCacheFrequency())
+                .setTransactionProvider(table.getTransactionProvider())
+                .setUseStatsForParallelization(table.useStatsForParallelization())
+                .setSchemaVersion(table.getSchemaVersion())
+                .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+                // Transformables
+                .setImmutableStorageScheme(
+                        (changedProps.getImmutableStorageSchemeProp() != null? changedProps.getImmutableStorageSchemeProp():table.getImmutableStorageScheme()))
+                .setQualifierEncodingScheme(
+                        (changedProps.getColumnEncodedBytesProp() != null? changedProps.getColumnEncodedBytesProp() : table.getEncodingScheme()))
+                .build();
+        SchemaExtractionProcessor schemaExtractionProcessor = new SchemaExtractionProcessor(systemTransformParams.getTenantId(),
+                connection.getQueryServices().getConfiguration(), newTable,  true);
+        String ddl = schemaExtractionProcessor.process();
+        LOGGER.info("Creating transforming table via " + ddl);
+        connection.createStatement().execute(ddl);
+        upsertTransform(systemTransformParams, connection);
+    }
+
+    public static SystemTransformRecord getTransformRecord(
+            String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
+        try (ResultSet resultSet = connection.prepareStatement("SELECT " +
+                PhoenixDatabaseMetaData.TENANT_ID + ", " +
+                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
+                PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
+                PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
+                PhoenixDatabaseMetaData.OLD_METADATA + " , " +
+                PhoenixDatabaseMetaData.NEW_METADATA + " , " +
+                PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
+                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE  " +
+                (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
+                (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
+                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
+                (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
+        ).executeQuery()) {
+            if (resultSet.next()) {
+                return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+            }
+            return null;
+        }
+    }
+
+    public static boolean checkIsTransformNeeded(MetaDataClient.MetaProperties metaProperties, String schemaName,
+                                                 PTable table, String logicalTableName, String parentTableName,
+                                                 String tenantId, PhoenixConnection connection) throws SQLException {
+        boolean isTransformNeeded = isTransformNeeded(metaProperties, table);
+        if (isTransformNeeded) {
+            SystemTransformRecord existingTransform = Transform.getTransformRecord(schemaName, logicalTableName, parentTableName, tenantId,connection);
+            if (existingTransform != null && existingTransform.isActive()) {
+                throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE)
+                        .setMessage(" Only one transform at a time is allowed ")
+                        .setSchemaName(schemaName).setTableName(logicalTableName).build().buildException();
+            }
+        }
+        return isTransformNeeded;
+    }
+
+    private static boolean isTransformNeeded(MetaDataClient.MetaProperties metaProperties, PTable table){
+        if (metaProperties.getImmutableStorageSchemeProp()!=null
+                && metaProperties.getImmutableStorageSchemeProp() != table.getImmutableStorageScheme()) {
+            // Transform is needed
+            return true;
+        }
+        if (metaProperties.getColumnEncodedBytesProp()!=null
+                && metaProperties.getColumnEncodedBytesProp() != table.getEncodingScheme()) {
+            return true;
+        }
+        return false;
+    }
+
+    public static void removeTransformRecord(
+            SystemTransformRecord transformRecord, PhoenixConnection connection) throws SQLException {
+        connection.prepareStatement("DELETE FROM  "
+                + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE " +
+                (Strings.isNullOrEmpty(transformRecord.getSchemaName()) ? "" :
+                        (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + transformRecord.getSchemaName() + "' AND ")) +
+                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + transformRecord.getLogicalTableName() + "' AND " +
+                PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + " ='" + transformRecord.getNewPhysicalTableName() + "' AND " +
+                PhoenixDatabaseMetaData.TRANSFORM_TYPE + " =" + transformRecord.getTransformType().getSerializedValue()
+        ).execute();
+    }
+
+    public static void upsertTransform(
             SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws SQLException {
         try (PreparedStatement stmt = connection.prepareStatement("UPSERT INTO " +
                 PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " ( " +
@@ -157,64 +307,70 @@ public class Transform {
     }
 
 
-    public static SystemTransformRecord getTransformRecord(
-            String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
-        try (ResultSet resultSet = connection.prepareStatement("SELECT " +
-                PhoenixDatabaseMetaData.TENANT_ID + ", " +
-                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
-                PhoenixDatabaseMetaData.OLD_METADATA + " , " +
-                PhoenixDatabaseMetaData.NEW_METADATA + " , " +
-                PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
-                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE  " +
-                (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
-                (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
-                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
-                (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
-        ).executeQuery()) {
-            if (resultSet.next()) {
-                return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+    public static void completeTransform(Connection connection, Configuration configuration) throws Exception{
+        // Will be called from Reducer
+        long timestamp= EnvironmentEdgeManager.currentTimeMillis();
+        String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
+        String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
+        String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
+        String oldTableLogicalName = SchemaUtil.getTableNameFromFullName(fullOldTableName);
+        String indexTableName = SchemaUtil.getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
+        String logicaTableName = oldTableLogicalName;
+        String logicalParentName = null;
+        if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE)
+            if (!Strings.isNullOrEmpty(indexTableName)) {
+                logicaTableName = indexTableName;
+                logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
             }
-            return null;
-        }
-    }
-
-    public static boolean checkIsTransformNeeded(MetaDataClient.MetaProperties metaProperties, String schemaName,
-                                                 PTable table, String logicalTableName, String parentTableName,
-                                                 String tenantId, PhoenixConnection connection) throws SQLException {
-        boolean isTransformNeeded = isTransformNeeded(metaProperties, table);
-        if (isTransformNeeded) {
-            SystemTransformRecord existingTransform = Transform.getTransformRecord(schemaName, logicalTableName, parentTableName, tenantId,connection);
-            if (existingTransform != null && existingTransform.isActive()) {
-                throw new SQLExceptionInfo.Builder(
-                        SQLExceptionCode.CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE)
-                        .setMessage(" Only one transform at a time is allowed ")
-                        .setSchemaName(schemaName).setTableName(logicalTableName).build().buildException();
+        boolean isPartial = PhoenixConfigurationUtil.getIsPartialTransform(configuration);
+        SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName, logicalParentName,
+                tenantId, connection.unwrap(PhoenixConnection.class));
+        if (!isPartial) {
+            String newTableName = SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
+            PTable pNewTable = PhoenixRuntime.getTable(connection, transformRecord.getNewPhysicalTableName());
+            PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(schemaName,logicaTableName));
+            if (pOldTable.getImmutableStorageScheme() != pNewTable.getImmutableStorageScheme() ||
+                    pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+                MetaDataClient.mutateTransformProperties(connection, tenantId, schemaName, logicaTableName, newTableName,
+                        pNewTable.getImmutableStorageScheme(), pNewTable.getEncodingScheme());
+                // We need to update the columns's qualifiers as well
+                if (pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+                    Short nextKeySeq = 0;
+                    for (PColumn newCol : pNewTable.getColumns()) {
+                        boolean isPk = SchemaUtil.isPKColumn(newCol);
+                        Short keySeq = isPk ? ++nextKeySeq : null;
+                        PColumn column = new PColumnImpl(newCol.getName(), newCol.getFamilyName(), newCol.getDataType(),
+                                newCol.getMaxLength(), newCol.getScale(), newCol.isNullable(), newCol.getPosition(), newCol.getSortOrder()
+                                , newCol.getArraySize(),
+                                newCol.getViewConstant(), newCol.isViewReferenced(), newCol.getExpressionStr(), newCol.isRowTimestamp(),
+                                newCol.isDynamic(), newCol.getColumnQualifierBytes(), EnvironmentEdgeManager.currentTimeMillis());
+                        addColumnMutation(connection.unwrap(PhoenixConnection.class), schemaName, logicaTableName, column,
+                                pNewTable.getParentTableName()==null? null:pNewTable.getParentTableName().getString()
+                                , pNewTable.getPKName()==null? null:pNewTable.getPKName().getString(), keySeq , pNewTable.getBucketNum() != null);
+                    }
+                }
             }
-        }
-        return isTransformNeeded;
-    }
+            // Clear cache so that the new table is used for queries
+            connection.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            TransformTool.updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
 
-    private static boolean isTransformNeeded(MetaDataClient.MetaProperties metaProperties, PTable table){
-        if (metaProperties.getImmutableStorageSchemeProp()!=null
-                && metaProperties.getImmutableStorageSchemeProp() != table.getImmutableStorageScheme()) {
-            // Transform is needed
-            return true;
+            // TODO Kick partial transform from the TransformMonitor
+            SystemTransformRecord.SystemTransformBuilder builder = new SystemTransformRecord.SystemTransformBuilder(transformRecord);
+            builder.setTransformStatus(PTable.TransformStatus.CREATED.name());
+            builder.setTransformJobId(null);
+            builder.setStartTs(new Timestamp(timestamp));
+            builder.setTransformRetryCount(0);
+            builder.setTransformType(PTable.TransformType.METADATA_TRANSFORM_PARTIAL);
+            SystemTransformRecord partialStr = builder.build();
+            Transform.upsertTransform(partialStr, connection.unwrap(PhoenixConnection.class));
+            connection.commit();
+        } else {
+            TransformTool.updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord, PTable.TransformStatus.COMPLETED);
+            connection.commit();
+            // TODO: cleanup
         }
-        if (metaProperties.getColumnEncodedBytesProp()!=null
-                && metaProperties.getColumnEncodedBytesProp() != table.getEncodingScheme()) {
-            return true;
-        }
-        return false;
     }
+
 }
 
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
new file mode 100644
index 0000000..72bdb2d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+    private boolean isMultiTenant;
+    // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
+    private List<Expression> newTableExpressions;
+    private Set<ColumnReference> newTableColumns;
+
+    private List<PDataType> newTableColumnTypes;
+    private int newTableColumnCount;
+    private byte[] newTableName;
+    private int nNewTableSaltBuckets;
+    private byte[] oldTableEmptyKeyValueCF;
+    private ImmutableBytesPtr emptyKeyValueCFPtr;
+    private int nOldTableCFs;
+    private boolean newTableWALDisabled;
+    private boolean newTableImmutableRows;
+    // Transient state
+    private final boolean isOldTableSalted;
+    private final RowKeySchema oldTableRowKeySchema;
+
+    private int estimatedNewTableRowKeyBytes;
+    private ColumnReference newTableEmptyKeyValueRef;
+    private ColumnReference oldTableEmptyKeyValueRef;
+    private boolean newTableRowKeyOrderOptimizable;
+
+    private PTable.QualifierEncodingScheme newTableEncodingScheme;
+    private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+    private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+    private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+    /*
+     * The first part of the pair is column family name
+     * and second part is the column name. The reason we need to track this state is because for certain storage schemes
+     * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an new
+     * table put/delete is different from the old columns in the phoenix schema.
+     */
+    private Set<Pair<String, String>> newTableColumnsInfo;
+    /*
+     * Map of covered columns where a key is column reference for a column in the data table
+     * and value is column reference for corresponding column in the new table.
+     */
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+    private String logicalNewTableName;
+
+    public static TransformMaintainer create(PTable oldTable, PTable newTable, PhoenixConnection connection) {
+        if (oldTable.getType() == PTableType.INDEX) {
+            throw new IllegalArgumentException();
+        }
+        TransformMaintainer maintainer = new TransformMaintainer(oldTable, newTable, connection);
+        return maintainer;
+    }
+
+    private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean isOldTableSalted) {
+        super(oldRowKeySchema, isOldTableSalted);
+        this.oldTableRowKeySchema = oldRowKeySchema;
+        this.isOldTableSalted = isOldTableSalted;
+    }
+
+    private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) {
+        this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+        this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable();
+        this.isMultiTenant = oldTable.isMultiTenant();
+
+        this.newTableEncodingScheme = newTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : newTable.getEncodingScheme();
+        this.newTableImmutableStorageScheme = newTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : newTable.getImmutableStorageScheme();
+        this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
+        this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
+
+        this.newTableName = newTable.getPhysicalName().getBytes();
+        boolean newTableWALDisabled = newTable.isWALDisabled();
+        int nNewTableColumns = newTable.getColumns().size();
+        int nNewTablePKColumns = newTable.getPKColumns().size();
+
+        List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+        this.newTableColumnCount = oldTablePKColumns.size();
+
+        this.newTableColumnTypes = Lists.newArrayListWithExpectedSize(nNewTablePKColumns);
+        this.newTableExpressions = Lists.newArrayListWithExpectedSize(nNewTableColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+        this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 : newTable.getBucketNum();
+        this.oldTableEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(oldTable);
+        this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable);
+        this.nOldTableCFs = oldTable.getColumnFamilies().size();
+        this.newTableWALDisabled = newTableWALDisabled;
+        this.newTableImmutableRows = newTable.isImmutableRows();
+        this.newTableColumnsInfo = Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+
+        for (int i = 0; i < newTable.getColumnFamilies().size(); i++) {
+            PColumnFamily family = newTable.getColumnFamilies().get(i);
+            for (PColumn newColumn : family.getColumns()) {
+                PColumn oldColumn = getColumnOrNull(oldTable, newColumn.getName().getString(), newColumn.getFamilyName().getString());
+                // This can happen during deletion where we don't need covered columns
+                if (oldColumn != null) {
+                    byte[] oldColumnCq = oldColumn.getColumnQualifierBytes();
+                    byte[] newColumnCq = newColumn.getColumnQualifierBytes();
+                    this.coveredColumnsMap.put(new ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq),
+                            new ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq));
+                }
+            }
+        }
+        this.logicalNewTableName = newTable.getName().getString();
+        initCachedState();
+    }
+
+    public static PColumn getColumnOrNull(PTable table, String columnName, String familyName) {
+        PColumnFamily family;
+        try {
+            family = table.getColumnFamily(familyName);
+        } catch (ColumnFamilyNotFoundException e) {
+            return null;
+        }
+        try {
+            return family.getPColumnForColumnName(columnName);
+        } catch (ColumnNotFoundException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Init calculated state reading/creating
+     */
+    private void initCachedState() {
+        byte[] newTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
+        byte[] oldTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
+        newTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);
+        oldTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier);
+        this.newTableColumns = Sets.newLinkedHashSetWithExpectedSize(this.newTableColumnCount);
+
+        for (ColumnReference colRef : coveredColumnsMap.keySet()) {
+            if (newTableImmutableStorageScheme == PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+                newTableColumns.add(colRef);
+            } else {
+                newTableColumns.add(new ColumnReference(colRef.getFamily(), QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
+            }
+        }
+    }
+
+    /**
+     * For client-side to serialize TransformMaintainer for a given table
+     *
+     * @param oldTable old table
+     * @param ptr      bytes pointer to hold returned serialized value
+     * @param newTable new table to serialize
+     */
+    public static void serialize(PTable oldTable, ImmutableBytesWritable ptr,
+                                 PTable newTable, PhoenixConnection connection) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(stream);
+        try {
+            // Encode data table salting
+            WritableUtils.writeVInt(output, oldTable.getBucketNum() == null ? 1 : -1);
+            // Write out data row key schema once, since it's the same
+            oldTable.getRowKeySchema().write(output);
+            org.apache.phoenix.coprocessor.generated.ServerCachingProtos.TransformMaintainer proto =
+                    TransformMaintainer.toProto(newTable.getTransformMaintainer(oldTable, connection));
+            byte[] protoBytes = proto.toByteArray();
+            WritableUtils.writeVInt(output, protoBytes.length);
+            output.write(protoBytes);
+
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+        ptr.set(stream.toByteArray(), 0, stream.size());
+    }
+
+    public static ServerCachingProtos.TransformMaintainer toProto(TransformMaintainer maintainer) throws IOException {
+        ServerCachingProtos.TransformMaintainer.Builder builder = ServerCachingProtos.TransformMaintainer.newBuilder();
+        builder.setSaltBuckets(maintainer.nNewTableSaltBuckets);
+        builder.setIsMultiTenant(maintainer.isMultiTenant);
+
+        for (ColumnReference colRef : maintainer.newTableColumns) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+            cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier()));
+            builder.addNewTableColumns(cRefBuilder.build());
+        }
+
+        for (Map.Entry<ColumnReference, ColumnReference> e : maintainer.coveredColumnsMap.entrySet()) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+            ColumnReference dataTableColRef = e.getKey();
+            cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
+            builder.addOldTableColRefForCoveredColumns(cRefBuilder.build());
+            if (maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS) {
+                // We need to serialize the colRefs of new tables only in case of encoded column names.
+                ColumnReference newTableColRef = e.getValue();
+                cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+                cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
+                cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
+                builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
+            }
+        }
+
+        builder.setNewTableColumnCount(maintainer.newTableColumnCount);
+        builder.setNewTableName(ByteStringer.wrap(maintainer.newTableName));
+        builder.setNewTableRowKeyOrderOptimizable(maintainer.newTableRowKeyOrderOptimizable);
+        builder.setOldTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.oldTableEmptyKeyValueCF));
+        ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder = ServerCachingProtos.ImmutableBytesWritable.newBuilder();
+        ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get()));
+        ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength());
+        ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset());
+        builder.setEmptyKeyValueColFamily(ibwBuilder.build());
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+            DataOutput output = new DataOutputStream(stream);
+            for (Expression expression : maintainer.newTableExpressions) {
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+                expression.write(output);
+            }
+            builder.setNewTableExpressions(ByteStringer.wrap(stream.toByteArray()));
+        }
+
+        builder.setNumDataTableColFamilies(maintainer.nOldTableCFs);
+        builder.setNewTableWalDisabled(maintainer.newTableWALDisabled);
+        builder.setNewTableRowKeyByteSize(maintainer.estimatedNewTableRowKeyBytes);
+        builder.setNewTableImmutable(maintainer.newTableImmutableRows);
+        for (Pair<String, String> p : maintainer.newTableColumnsInfo) {
+            ServerCachingProtos.ColumnInfo.Builder ciBuilder = ServerCachingProtos.ColumnInfo.newBuilder();
+            if (p.getFirst() != null) {
+                ciBuilder.setFamilyName(p.getFirst());
+            }
+            ciBuilder.setColumnName(p.getSecond());
+            builder.addNewTableColumnInfo(ciBuilder.build());
+        }
+        builder.setNewTableEncodingScheme(maintainer.newTableEncodingScheme.getSerializedMetadataValue());
+        builder.setNewTableImmutableStorageScheme(maintainer.newTableImmutableStorageScheme.getSerializedMetadataValue());
+        builder.setLogicalNewTableName(maintainer.logicalNewTableName);
+        builder.setOldTableEncodingScheme(maintainer.oldTableEncodingScheme.getSerializedMetadataValue());
+        builder.setOldTableImmutableStorageScheme(maintainer.oldTableImmutableStorageScheme.getSerializedMetadataValue());
+        return builder.build();
+    }
+
+    public static TransformMaintainer fromProto(ServerCachingProtos.TransformMaintainer proto, RowKeySchema dataTableRowKeySchema, boolean isDataTableSalted) throws IOException {
+        TransformMaintainer maintainer = new TransformMaintainer(dataTableRowKeySchema, isDataTableSalted);
+        maintainer.nNewTableSaltBuckets = proto.getSaltBuckets();
+        maintainer.isMultiTenant = proto.getIsMultiTenant();
+        List<ServerCachingProtos.ColumnReference> newTableColList = proto.getNewTableColumnsList();
+        maintainer.newTableColumns = new HashSet<ColumnReference>(newTableColList.size());
+        for (ServerCachingProtos.ColumnReference colRefFromProto : newTableColList) {
+            maintainer.newTableColumns.add(new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray()));
+        }
+
+        maintainer.newTableName = proto.getNewTableName().toByteArray();
+        if (proto.getNewTableColumnCount() != -1) {
+            maintainer.newTableColumnCount = proto.getNewTableColumnCount();
+        }
+
+        maintainer.newTableRowKeyOrderOptimizable = proto.getNewTableRowKeyOrderOptimizable();
+        maintainer.oldTableEmptyKeyValueCF = proto.getOldTableEmptyKeyValueColFamily().toByteArray();
+        ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily = proto.getEmptyKeyValueColFamily();
+        maintainer.emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(), emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength());
+
+        maintainer.nOldTableCFs = proto.getNumDataTableColFamilies();
+        maintainer.newTableWALDisabled = proto.getNewTableWalDisabled();
+        maintainer.estimatedNewTableRowKeyBytes = proto.getNewTableRowKeyByteSize();
+        maintainer.newTableImmutableRows = proto.getNewTableImmutable();
+        List<ServerCachingProtos.ColumnInfo> newTblColumnInfoList = proto.getNewTableColumnInfoList();
+        maintainer.newTableColumnsInfo = Sets.newHashSet();
+        for (ServerCachingProtos.ColumnInfo info : newTblColumnInfoList) {
+            maintainer.newTableColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName()));
+        }
+        // proto doesn't support single byte so need an explicit cast here
+        maintainer.newTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getNewTableEncodingScheme());
+        maintainer.newTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getNewTableImmutableStorageScheme());
+        maintainer.oldTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getOldTableEncodingScheme());
+        maintainer.oldTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getOldTableImmutableStorageScheme());
+
+        List<ServerCachingProtos.ColumnReference> oldTableColRefsForCoveredColumnsList = proto.getOldTableColRefForCoveredColumnsList();
+        List<ServerCachingProtos.ColumnReference> newTableColRefsForCoveredColumnsList = proto.getNewTableColRefForCoveredColumnsList();
+        maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(oldTableColRefsForCoveredColumnsList.size());
+        boolean encodedColumnNames = maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS;
+        Iterator<ServerCachingProtos.ColumnReference> newTableColRefItr = newTableColRefsForCoveredColumnsList.iterator();
+        for (ServerCachingProtos.ColumnReference colRefFromProto : oldTableColRefsForCoveredColumnsList) {
+            ColumnReference oldTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray());
+            ColumnReference newTableColRef;
+            if (encodedColumnNames) {
+                ServerCachingProtos.ColumnReference fromProto = newTableColRefItr.next();
+                newTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier().toByteArray());
+            } else {
+                byte[] cq = oldTableColRef.getQualifier();
+                byte[] cf = oldTableColRef.getFamily();
+                newTableColRef = new ColumnReference(cf, cq);
+            }
+            maintainer.coveredColumnsMap.put(oldTableColRef, newTableColRef);
+        }
+        maintainer.logicalNewTableName = proto.getLogicalNewTableName();
+        maintainer.initCachedState();
+        return maintainer;
+    }
+
+
+    public static List<IndexMaintainer> deserialize(byte[] buf) {
+        return deserialize(buf, 0, buf.length);
+    }
+
+    private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length) {
+        List<IndexMaintainer> maintainers = Collections.emptyList();
+        if (length > 0) {
+            ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length);
+            DataInput input = new DataInputStream(stream);
+            try {
+                int size = WritableUtils.readVInt(input);
+                boolean isDataTableSalted = size < 0;
+                size = Math.abs(size);
+                RowKeySchema rowKeySchema = new RowKeySchema();
+                rowKeySchema.readFields(input);
+                maintainers = Lists.newArrayListWithExpectedSize(size);
+                for (int i = 0; i < size; i++) {
+                    int protoSize = WritableUtils.readVInt(input);
+                    byte[] b = new byte[protoSize];
+                    input.readFully(b);
+                    ServerCachingProtos.TransformMaintainer proto = ServerCachingProtos.TransformMaintainer.parseFrom(b);
+                    maintainers.add(TransformMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+        }
+        return maintainers;
+    }
+
+    // Return new table's name
+    public byte[] getIndexTableName() {
+        return newTableName;
+    }
+
+    // Builds new table's rowkey using the old table's rowkey.
+    // This method will change when we support rowkey related transforms
+    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts)  {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        boolean isNewTableSalted = nNewTableSaltBuckets > 0;
+
+        try (TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedNewTableRowKeyBytes)){
+            DataOutput output = new DataOutputStream(stream);
+
+            if (isNewTableSalted) {
+                output.write(0); // will be set at end to new table salt byte
+            }
+            // The oldTableRowKeySchema includes the salt byte field,
+            // so we must adjust for that here.
+            int dataPosOffset = isOldTableSalted ? 1 : 0 ;
+            //BitSet viewConstantColumnBitSet = this.rowKeyMetaData.getViewConstantColumnBitSet();
+            // Skip data table salt byte
+            int maxRowKeyOffset = rowKeyPtr.getOffset() + rowKeyPtr.getLength();
+            oldTableRowKeySchema.iterator(rowKeyPtr, ptr, dataPosOffset);
+
+            // Write new table row key
+            while (oldTableRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset) != null) {
+                output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
+                if (!oldTableRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
+                    output.writeByte(SchemaUtil.getSeparatorByte(newTableRowKeyOrderOptimizable, ptr.getLength()==0
+                            , oldTableRowKeySchema.getField(dataPosOffset)));
+                }
+                dataPosOffset++;
+            }
+
+            byte[] newTableRowKey = stream.getBuffer();
+            // Remove trailing nulls
+            int length = stream.size();
+            if (isNewTableSalted) {
+                // Set salt byte
+                byte saltByte = SaltingUtil.getSaltingByte(newTableRowKey, SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES, nNewTableSaltBuckets);
+                newTableRowKey[0] = saltByte;
+            }
+            return newTableRowKey.length == length ? newTableRowKey : Arrays.copyOf(newTableRowKey, length);
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+    }
+
+    public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable oldRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+        byte[] newRowKey = this.buildRowKey(valueGetter, oldRowKeyPtr, regionStartKey, regionEndKey, ts);
+        return buildUpdateMutation(kvBuilder, valueGetter, oldRowKeyPtr, ts, regionStartKey, regionEndKey,
+                newRowKey, this.getEmptyKeyValueFamily(), coveredColumnsMap,
+                newTableEmptyKeyValueRef, newTableWALDisabled, oldTableImmutableStorageScheme, newTableImmutableStorageScheme, newTableEncodingScheme);
+    }
+
+    public ImmutableBytesPtr getEmptyKeyValueFamily() {
+        return emptyKeyValueCFPtr;
+    }
+
+    public byte[] getEmptyKeyValueQualifier() {
+        return newTableEmptyKeyValueRef.getQualifier();
+    }
+
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/protobuf/ServerCachingService.proto b/phoenix-core/src/main/protobuf/ServerCachingService.proto
index 9993992..9df9684 100644
--- a/phoenix-core/src/main/protobuf/ServerCachingService.proto
+++ b/phoenix-core/src/main/protobuf/ServerCachingService.proto
@@ -70,6 +70,31 @@ message IndexMaintainer {
   optional int32 dataImmutableStorageScheme = 27;
 }
 
+message TransformMaintainer {
+  required int32 saltBuckets = 1;
+  required bool isMultiTenant = 2;
+  repeated ColumnReference newTableColumns = 3;
+  repeated ColumnReference oldTableColRefForCoveredColumns = 4;
+  repeated ColumnReference newTableColRefForCoveredColumns = 5;
+  required bytes newTableName = 6;
+  required bool newTableRowKeyOrderOptimizable = 7;
+  required bytes oldTableEmptyKeyValueColFamily = 8;
+  required ImmutableBytesWritable emptyKeyValueColFamily = 9;
+  optional bytes newTableExpressions = 10;
+  required int32 numDataTableColFamilies = 11;
+  required bool newTableWalDisabled = 12;
+  required int32 newTableRowKeyByteSize = 13;
+  required bool newTableImmutable = 14;
+  repeated ColumnInfo newTableColumnInfo = 15;
+  required int32 newTableEncodingScheme = 16;
+  required int32 newTableImmutableStorageScheme = 17;
+  optional int32 newTableColumnCount = 18 [default = -1];
+  optional string parentTableType = 19;
+  optional string logicalNewTableName = 20;
+  optional int32 oldTableEncodingScheme = 21;
+  optional int32 oldTableImmutableStorageScheme = 22;
+}
+
 message AddServerCacheRequest {
   optional bytes tenantId = 1;
   required bytes cacheId = 2;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index b534603..d138d42 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -196,6 +196,7 @@ import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFac
 
 public abstract class BaseTest {
     public static final String DRIVER_CLASS_NAME_ATTRIB = "phoenix.driver.class.name";
+    protected static final String NULL_STRING="NULL";
     private static final double ZERO = 1e-9;
     private static final Map<String,String> tableDDLMap;
     private static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class);