You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2021/12/14 22:49:18 UTC

[GitHub] [phoenix] gokceni opened a new pull request #1366: PHOENIX-6612 Add TransformTool

gokceni opened a new pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366


   Co-authored-by: Gokcen Iskender <47...@users.noreply.github.com>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on a change in pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
gokceni commented on a change in pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#discussion_r773376594



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+    private boolean isMultiTenant;
+    // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
+    private List<Expression> newTableExpressions;
+    private Set<ColumnReference> newTableColumns;
+
+    private List<PDataType> newTableColumnTypes;
+    private int newTableColumnCount;
+    private byte[] newTableName;
+    private int nNewTableSaltBuckets;
+    private byte[] oldTableEmptyKeyValueCF;
+    private ImmutableBytesPtr emptyKeyValueCFPtr;
+    private int nOldTableCFs;
+    private boolean newTableWALDisabled;
+    private boolean newTableImmutableRows;
+    // Transient state
+    private final boolean isOldTableSalted;
+    private final RowKeySchema oldTableRowKeySchema;
+
+    private int estimatedNewTableRowKeyBytes;
+    private ColumnReference newTableEmptyKeyValueRef;
+    private ColumnReference oldTableEmptyKeyValueRef;
+    private boolean newTableRowKeyOrderOptimizable;
+
+    private PTable.QualifierEncodingScheme newTableEncodingScheme;
+    private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+    private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+    private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+    /*
+     * The first part of the pair is column family name
+     * and second part is the column name. The reason we need to track this state is because for certain storage schemes
+     * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an new
+     * table put/delete is different from the old columns in the phoenix schema.
+     */
+    private Set<Pair<String, String>> newTableColumnsInfo;
+    /*
+     * Map of covered columns where a key is column reference for a column in the data table
+     * and value is column reference for corresponding column in the new table.
+     */
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+    private String logicalNewTableName;
+
+    public static TransformMaintainer create(PTable oldTable, PTable newTable, PhoenixConnection connection) {
+        if (oldTable.getType() == PTableType.INDEX) {
+            throw new IllegalArgumentException();
+        }
+        TransformMaintainer maintainer = new TransformMaintainer(oldTable, newTable, connection);
+        return maintainer;
+    }
+
+    private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean isOldTableSalted) {
+        super(oldRowKeySchema, isOldTableSalted);
+        this.oldTableRowKeySchema = oldRowKeySchema;
+        this.isOldTableSalted = isOldTableSalted;
+    }
+
+    private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) {
+        this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+        this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable();
+        this.isMultiTenant = oldTable.isMultiTenant();
+
+        this.newTableEncodingScheme = newTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : newTable.getEncodingScheme();
+        this.newTableImmutableStorageScheme = newTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : newTable.getImmutableStorageScheme();
+        this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
+        this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
+
+        this.newTableName = newTable.getPhysicalName().getBytes();
+        boolean newTableWALDisabled = newTable.isWALDisabled();
+        int nNewTableColumns = newTable.getColumns().size();
+        int nNewTablePKColumns = newTable.getPKColumns().size();
+
+        List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+        this.newTableColumnCount = oldTablePKColumns.size();
+
+        this.newTableColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns);
+        this.newTableExpressions = Lists.newArrayListWithExpectedSize(nNewTableColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+        this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 : newTable.getBucketNum();
+        this.oldTableEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(oldTable);
+        this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable);
+        this.nOldTableCFs = oldTable.getColumnFamilies().size();
+        this.newTableWALDisabled = newTableWALDisabled;
+        this.newTableImmutableRows = newTable.isImmutableRows();
+        this.newTableColumnsInfo = Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+
+        for (int i = 0; i < newTable.getColumnFamilies().size(); i++) {
+            PColumnFamily family = newTable.getColumnFamilies().get(i);
+            for (PColumn newColumn : family.getColumns()) {
+                PColumn oldColumn = getColumnOrNull(oldTable, newColumn.getName().getString(), newColumn.getFamilyName().getString());
+                // This can happen during deletion where we don't need covered columns
+                if (oldColumn != null) {
+                    byte[] oldColumnCq = oldColumn.getColumnQualifierBytes();
+                    byte[] newColumnCq = newColumn.getColumnQualifierBytes();
+                    this.coveredColumnsMap.put(new ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq),
+                            new ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq));
+                }
+            }
+        }
+        this.logicalNewTableName = newTable.getName().getString();
+        initCachedState();
+    }
+
+    public static PColumn getColumnOrNull(PTable table, String columnName, String familyName) {
+        PColumnFamily family;
+        try {
+            family = table.getColumnFamily(familyName);
+        } catch (ColumnFamilyNotFoundException e) {
+            return null;
+        }
+        try {
+            return family.getPColumnForColumnName(columnName);
+        } catch (ColumnNotFoundException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Init calculated state reading/creating
+     */
+    private void initCachedState() {
+        byte[] newTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
+        byte[] oldTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
+        newTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);
+        oldTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier);

Review comment:
       Yes. We might need it with the new transform types. If you prefer, I can remove now and add when we need it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
gokceni commented on pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#issuecomment-994114347


   @gjacoby126 @virajjasani I decided to do 4.x first. Then I will cherry-pick to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
gokceni commented on pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#issuecomment-999110611


   Jenkins test this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gjacoby126 commented on a change in pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
gjacoby126 commented on a change in pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#discussion_r770051369



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
##########
@@ -456,7 +456,7 @@ public static void dumpTable(String tableName) throws Exception {
                     hTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName.getBytes());
             Scan scan = new Scan();
             scan.setRaw(true);
-            LOGGER.info("***** Table Name : " + tableName);
+            System.out.println("***** Table Name : " + tableName);

Review comment:
       Please remove printlns

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
##########
@@ -529,15 +530,14 @@ protected void validateIndex(Connection connection, String tableName, boolean is
 
     public static void renameAndDropPhysicalTable(Connection conn, String tenantId, String schema, String tableName, String physicalName, boolean isNamespaceEnabled) throws Exception {
         String
-                changeName =
-                String.format(
-                        "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, '%s', '%s', NULL, NULL, '%s')",
-                        tenantId, schema, tableName, physicalName);
+                changeName = String.format(
+                "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, %s, '%s', NULL, NULL, '%s')",
+                tenantId, schema==null ? null : ("'" + schema + "'"), tableName, physicalName);
         conn.createStatement().execute(changeName);
         conn.commit();
 
         String fullTableName = SchemaUtil.getTableName(schema, tableName);
-        if (isNamespaceEnabled) {
+        if (isNamespaceEnabled && !(Strings.isNullOrEmpty(schema) || "NULL".equals(schema))) {

Review comment:
       "NULL" should probably be a constant

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.schema.transform.Transform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Reducer class that does only one task and that is to update the index state of the table.

Review comment:
       Comment seems incorrect (probably from copy/paste)

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
##########
@@ -465,7 +465,7 @@ public static void dumpTable(String tableName) throws Exception {
                             .entrySet()) {
                         byte[] family = entryF.getKey();
                     }
-                    LOGGER.info(cellString + " ****** value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
+                    System.out.println(cellString + " ****** value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));

Review comment:
       Ditto

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -71,14 +87,150 @@ public static void addTransform(PhoenixConnection connection, String tenantId, P
                 newPhysicalTableName = generateNewTableName(schema, logicalTableName, sequenceNum);
             }
             transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
-            Transform.addTransform(transformBuilder.build(), connection);
+            Transform.addTransform(table, changingProperties, transformBuilder.build(), connection);
         } catch (JsonProcessingException ex) {
             LOGGER.error("addTransform failed", ex);
             throw new SQLException("Adding transform failed with JsonProcessingException");
+        } catch (SQLException ex) {
+            throw ex;
+        } catch(Exception ex) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.valueOf("CANNOT_MUTATE_TABLE"))
+                    .setSchemaName((table.getSchemaName() == null? null: table.getSchemaName().getString()))
+                    .setRootCause(ex)
+                    .setTableName(table.getName().getString()).build().buildException();
         }
     }
 
-    public static void addTransform(
+    protected static void addTransform(
+            PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws Exception {
+        PName newTableName = PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
+        PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
+        PTable newTable = new PTableImpl.Builder()
+                .setTableName(newTableNameWithoutSchema)
+                .setParentTableName(table.getParentTableName())
+                .setBaseTableLogicalName(table.getBaseTableLogicalName())
+                .setPhysicalTableName(newTableNameWithoutSchema)
+                .setAllColumns(table.getColumns())
+                .setAppendOnlySchema(table.isAppendOnlySchema())
+                .setAutoPartitionSeqName(table.getAutoPartitionSeqName())
+                .setBaseColumnCount(table.getBaseColumnCount())
+                .setBucketNum(table.getBucketNum())
+                .setDefaultFamilyName(table.getDefaultFamilyName())
+                .setDisableWAL(table.isWALDisabled())
+                .setEstimatedSize(table.getEstimatedSize())
+                .setFamilies(table.getColumnFamilies())
+                .setImmutableRows(table.isImmutableRows())
+                .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+                .setIndexType(table.getIndexType())
+                .setName(newTableName)
+                .setMultiTenant(table.isMultiTenant())
+                .setParentName(table.getParentName())
+                .setParentSchemaName(table.getParentSchemaName())
+                .setPhoenixTTL(table.getPhoenixTTL())
+                .setNamespaceMapped(table.isNamespaceMapped())
+                .setSchemaName(table.getSchemaName())
+                .setPkColumns(table.getPKColumns())
+                .setPkName(table.getPKName())
+                .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
+                .setRowKeySchema(table.getRowKeySchema())
+                .setStoreNulls(table.getStoreNulls())
+                .setTenantId(table.getTenantId())
+                .setType(table.getType())
+                // SchemaExtractor uses physical name to get the table descriptor from. So we use the existing table here
+                .setPhysicalNames(ImmutableList.copyOf(table.getPhysicalNames()))
+                .setUpdateCacheFrequency(table.getUpdateCacheFrequency())
+                .setTransactionProvider(table.getTransactionProvider())
+                .setUseStatsForParallelization(table.useStatsForParallelization())
+                // TODO SET SCHEMAVERSION

Review comment:
       Need to set schema version and change detection enabled now that they're checked in, right?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -71,14 +87,150 @@ public static void addTransform(PhoenixConnection connection, String tenantId, P
                 newPhysicalTableName = generateNewTableName(schema, logicalTableName, sequenceNum);
             }
             transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
-            Transform.addTransform(transformBuilder.build(), connection);
+            Transform.addTransform(table, changingProperties, transformBuilder.build(), connection);
         } catch (JsonProcessingException ex) {
             LOGGER.error("addTransform failed", ex);
             throw new SQLException("Adding transform failed with JsonProcessingException");
+        } catch (SQLException ex) {
+            throw ex;
+        } catch(Exception ex) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.valueOf("CANNOT_MUTATE_TABLE"))
+                    .setSchemaName((table.getSchemaName() == null? null: table.getSchemaName().getString()))
+                    .setRootCause(ex)
+                    .setTableName(table.getName().getString()).build().buildException();
         }
     }
 
-    public static void addTransform(
+    protected static void addTransform(
+            PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws Exception {
+        PName newTableName = PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
+        PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
+        PTable newTable = new PTableImpl.Builder()
+                .setTableName(newTableNameWithoutSchema)
+                .setParentTableName(table.getParentTableName())
+                .setBaseTableLogicalName(table.getBaseTableLogicalName())
+                .setPhysicalTableName(newTableNameWithoutSchema)
+                .setAllColumns(table.getColumns())
+                .setAppendOnlySchema(table.isAppendOnlySchema())
+                .setAutoPartitionSeqName(table.getAutoPartitionSeqName())
+                .setBaseColumnCount(table.getBaseColumnCount())
+                .setBucketNum(table.getBucketNum())
+                .setDefaultFamilyName(table.getDefaultFamilyName())
+                .setDisableWAL(table.isWALDisabled())
+                .setEstimatedSize(table.getEstimatedSize())
+                .setFamilies(table.getColumnFamilies())
+                .setImmutableRows(table.isImmutableRows())
+                .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+                .setIndexType(table.getIndexType())
+                .setName(newTableName)
+                .setMultiTenant(table.isMultiTenant())
+                .setParentName(table.getParentName())
+                .setParentSchemaName(table.getParentSchemaName())
+                .setPhoenixTTL(table.getPhoenixTTL())
+                .setNamespaceMapped(table.isNamespaceMapped())
+                .setSchemaName(table.getSchemaName())
+                .setPkColumns(table.getPKColumns())
+                .setPkName(table.getPKName())
+                .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
+                .setRowKeySchema(table.getRowKeySchema())
+                .setStoreNulls(table.getStoreNulls())
+                .setTenantId(table.getTenantId())
+                .setType(table.getType())
+                // SchemaExtractor uses physical name to get the table descriptor from. So we use the existing table here
+                .setPhysicalNames(ImmutableList.copyOf(table.getPhysicalNames()))
+                .setUpdateCacheFrequency(table.getUpdateCacheFrequency())
+                .setTransactionProvider(table.getTransactionProvider())
+                .setUseStatsForParallelization(table.useStatsForParallelization())
+                // TODO SET SCHEMAVERSION
+                // Transformables
+                .setImmutableStorageScheme(
+                        (changedProps.getImmutableStorageSchemeProp() != null? changedProps.getImmutableStorageSchemeProp():table.getImmutableStorageScheme()))
+                .setQualifierEncodingScheme(
+                        (changedProps.getColumnEncodedBytesProp() != null? changedProps.getColumnEncodedBytesProp() : table.getEncodingScheme()))
+                .build();

Review comment:
       One thing to handle in a future JIRA is how this will co-exist with schema registry -- when we transform we probably need to create a new schema entry in the external schema registry because change detection will need to know the column encoding to be able to parse WAL edits. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+    private boolean isMultiTenant;
+    // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
+    private List<Expression> newTableExpressions;
+    private Set<ColumnReference> newTableColumns;
+
+    private List<PDataType> newTableColumnTypes;
+    private int newTableColumnCount;
+    private byte[] newTableName;
+    private int nNewTableSaltBuckets;
+    private byte[] oldTableEmptyKeyValueCF;
+    private ImmutableBytesPtr emptyKeyValueCFPtr;
+    private int nOldTableCFs;
+    private boolean newTableWALDisabled;
+    private boolean newTableImmutableRows;
+    // Transient state
+    private final boolean isOldTableSalted;
+    private final RowKeySchema oldTableRowKeySchema;
+
+    private int estimatedNewTableRowKeyBytes;
+    private ColumnReference newTableEmptyKeyValueRef;
+    private ColumnReference oldTableEmptyKeyValueRef;
+    private boolean newTableRowKeyOrderOptimizable;
+
+    private PTable.QualifierEncodingScheme newTableEncodingScheme;
+    private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+    private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+    private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+    /*
+     * The first part of the pair is column family name
+     * and second part is the column name. The reason we need to track this state is because for certain storage schemes
+     * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an new
+     * table put/delete is different from the old columns in the phoenix schema.
+     */
+    private Set<Pair<String, String>> newTableColumnsInfo;
+    /*
+     * Map of covered columns where a key is column reference for a column in the data table
+     * and value is column reference for corresponding column in the new table.
+     */
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+    private String logicalNewTableName;
+
+    public static TransformMaintainer create(PTable oldTable, PTable newTable, PhoenixConnection connection) {
+        if (oldTable.getType() == PTableType.INDEX) {
+            throw new IllegalArgumentException();
+        }
+        TransformMaintainer maintainer = new TransformMaintainer(oldTable, newTable, connection);
+        return maintainer;
+    }
+
+    private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean isOldTableSalted) {
+        super(oldRowKeySchema, isOldTableSalted);
+        this.oldTableRowKeySchema = oldRowKeySchema;
+        this.isOldTableSalted = isOldTableSalted;
+    }
+
+    private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) {
+        this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+        this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable();
+        this.isMultiTenant = oldTable.isMultiTenant();
+
+        this.newTableEncodingScheme = newTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : newTable.getEncodingScheme();
+        this.newTableImmutableStorageScheme = newTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : newTable.getImmutableStorageScheme();
+        this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
+        this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
+
+        this.newTableName = newTable.getPhysicalName().getBytes();
+        boolean newTableWALDisabled = newTable.isWALDisabled();
+        int nNewTableColumns = newTable.getColumns().size();
+        int nNewTablePKColumns = newTable.getPKColumns().size();
+
+        List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+        this.newTableColumnCount = oldTablePKColumns.size();
+
+        this.newTableColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns);
+        this.newTableExpressions = Lists.newArrayListWithExpectedSize(nNewTableColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+        this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 : newTable.getBucketNum();
+        this.oldTableEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(oldTable);
+        this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable);
+        this.nOldTableCFs = oldTable.getColumnFamilies().size();
+        this.newTableWALDisabled = newTableWALDisabled;
+        this.newTableImmutableRows = newTable.isImmutableRows();
+        this.newTableColumnsInfo = Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+
+        for (int i = 0; i < newTable.getColumnFamilies().size(); i++) {
+            PColumnFamily family = newTable.getColumnFamilies().get(i);
+            for (PColumn newColumn : family.getColumns()) {
+                PColumn oldColumn = getColumnOrNull(oldTable, newColumn.getName().getString(), newColumn.getFamilyName().getString());
+                // This can happen during deletion where we don't need covered columns
+                if (oldColumn != null) {
+                    byte[] oldColumnCq = oldColumn.getColumnQualifierBytes();
+                    byte[] newColumnCq = newColumn.getColumnQualifierBytes();
+                    this.coveredColumnsMap.put(new ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq),
+                            new ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq));
+                }
+            }
+        }
+        this.logicalNewTableName = newTable.getName().getString();
+        initCachedState();
+    }
+
+    public static PColumn getColumnOrNull(PTable table, String columnName, String familyName) {
+        PColumnFamily family;
+        try {
+            family = table.getColumnFamily(familyName);
+        } catch (ColumnFamilyNotFoundException e) {
+            return null;
+        }
+        try {
+            return family.getPColumnForColumnName(columnName);
+        } catch (ColumnNotFoundException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Init calculated state reading/creating
+     */
+    private void initCachedState() {
+        byte[] newTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
+        byte[] oldTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
+        newTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);
+        oldTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier);
+        this.newTableColumns = Sets.newLinkedHashSetWithExpectedSize(this.newTableColumnCount);
+
+        for (ColumnReference colRef : coveredColumnsMap.keySet()) {
+            if (newTableImmutableStorageScheme == PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+                newTableColumns.add(colRef);
+            } else {
+                newTableColumns.add(new ColumnReference(colRef.getFamily(), QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
+            }
+        }
+    }
+
+    /**
+     * For client-side to serialize TransformMaintainer for a given table
+     *
+     * @param oldTable old table
+     * @param ptr      bytes pointer to hold returned serialized value
+     * @param newTable new table to serialize
+     */
+    public static void serialize(PTable oldTable, ImmutableBytesWritable ptr,
+                                 PTable newTable, PhoenixConnection connection) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(stream);
+        try {
+            // Encode data table salting
+            WritableUtils.writeVInt(output, oldTable.getBucketNum() == null ? 1 : -1);
+            // Write out data row key schema once, since it's the same
+            oldTable.getRowKeySchema().write(output);
+            org.apache.phoenix.coprocessor.generated.ServerCachingProtos.TransformMaintainer proto =
+                    TransformMaintainer.toProto(newTable.getTransformMaintainer(oldTable, connection));
+            byte[] protoBytes = proto.toByteArray();
+            WritableUtils.writeVInt(output, protoBytes.length);
+            output.write(protoBytes);
+
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+        ptr.set(stream.toByteArray(), 0, stream.size());
+    }
+
+    public static ServerCachingProtos.TransformMaintainer toProto(TransformMaintainer maintainer) throws IOException {
+        ServerCachingProtos.TransformMaintainer.Builder builder = ServerCachingProtos.TransformMaintainer.newBuilder();
+        builder.setSaltBuckets(maintainer.nNewTableSaltBuckets);
+        builder.setIsMultiTenant(maintainer.isMultiTenant);
+
+        for (ColumnReference colRef : maintainer.newTableColumns) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+            cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier()));
+            builder.addNewTableColumns(cRefBuilder.build());
+        }
+
+        for (Map.Entry<ColumnReference, ColumnReference> e : maintainer.coveredColumnsMap.entrySet()) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+            ColumnReference dataTableColRef = e.getKey();
+            cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
+            builder.addOldTableColRefForCoveredColumns(cRefBuilder.build());
+            if (maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS) {
+                // We need to serialize the colRefs of new tables only in case of encoded column names.
+                ColumnReference newTableColRef = e.getValue();
+                cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+                cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
+                cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
+                builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
+            }
+        }
+
+        builder.setNewTableColumnCount(maintainer.newTableColumnCount);
+        builder.setNewTableName(ByteStringer.wrap(maintainer.newTableName));
+        builder.setNewTableRowKeyOrderOptimizable(maintainer.newTableRowKeyOrderOptimizable);
+        builder.setOldTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.oldTableEmptyKeyValueCF));
+        ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder = ServerCachingProtos.ImmutableBytesWritable.newBuilder();
+        ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get()));
+        ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength());
+        ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset());
+        builder.setEmptyKeyValueColFamily(ibwBuilder.build());
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+            DataOutput output = new DataOutputStream(stream);
+            for (Expression expression : maintainer.newTableExpressions) {
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+                expression.write(output);
+            }
+            builder.setNewTableExpressions(ByteStringer.wrap(stream.toByteArray()));
+        }
+
+        builder.setNumDataTableColFamilies(maintainer.nOldTableCFs);
+        builder.setNewTableWalDisabled(maintainer.newTableWALDisabled);
+        builder.setNewTableRowKeyByteSize(maintainer.estimatedNewTableRowKeyBytes);
+        builder.setNewTableImmutable(maintainer.newTableImmutableRows);
+        for (Pair<String, String> p : maintainer.newTableColumnsInfo) {
+            ServerCachingProtos.ColumnInfo.Builder ciBuilder = ServerCachingProtos.ColumnInfo.newBuilder();
+            if (p.getFirst() != null) {
+                ciBuilder.setFamilyName(p.getFirst());
+            }
+            ciBuilder.setColumnName(p.getSecond());
+            builder.addNewTableColumnInfo(ciBuilder.build());
+        }
+        builder.setNewTableEncodingScheme(maintainer.newTableEncodingScheme.getSerializedMetadataValue());
+        builder.setNewTableImmutableStorageScheme(maintainer.newTableImmutableStorageScheme.getSerializedMetadataValue());
+        builder.setLogicalNewTableName(maintainer.logicalNewTableName);
+        builder.setOldTableEncodingScheme(maintainer.oldTableEncodingScheme.getSerializedMetadataValue());
+        builder.setOldTableImmutableStorageScheme(maintainer.oldTableImmutableStorageScheme.getSerializedMetadataValue());
+        return builder.build();
+    }
+
+    public static TransformMaintainer fromProto(ServerCachingProtos.TransformMaintainer proto, RowKeySchema dataTableRowKeySchema, boolean isDataTableSalted) throws IOException {
+        TransformMaintainer maintainer = new TransformMaintainer(dataTableRowKeySchema, isDataTableSalted);
+        maintainer.nNewTableSaltBuckets = proto.getSaltBuckets();
+        maintainer.isMultiTenant = proto.getIsMultiTenant();
+        List<ServerCachingProtos.ColumnReference> newTableColList = proto.getNewTableColumnsList();
+        maintainer.newTableColumns = new HashSet<ColumnReference>(newTableColList.size());
+        for (ServerCachingProtos.ColumnReference colRefFromProto : newTableColList) {
+            maintainer.newTableColumns.add(new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray()));
+        }
+
+        maintainer.newTableName = proto.getNewTableName().toByteArray();
+        if (proto.getNewTableColumnCount() != -1) {
+            maintainer.newTableColumnCount = proto.getNewTableColumnCount();
+        }
+
+        maintainer.newTableRowKeyOrderOptimizable = proto.getNewTableRowKeyOrderOptimizable();
+        maintainer.oldTableEmptyKeyValueCF = proto.getOldTableEmptyKeyValueColFamily().toByteArray();
+        ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily = proto.getEmptyKeyValueColFamily();
+        maintainer.emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(), emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength());
+
+        maintainer.nOldTableCFs = proto.getNumDataTableColFamilies();
+        maintainer.newTableWALDisabled = proto.getNewTableWalDisabled();
+        maintainer.estimatedNewTableRowKeyBytes = proto.getNewTableRowKeyByteSize();
+        maintainer.newTableImmutableRows = proto.getNewTableImmutable();
+        List<ServerCachingProtos.ColumnInfo> newTblColumnInfoList = proto.getNewTableColumnInfoList();
+        maintainer.newTableColumnsInfo = Sets.newHashSet();
+        for (ServerCachingProtos.ColumnInfo info : newTblColumnInfoList) {
+            maintainer.newTableColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName()));
+        }
+        // proto doesn't support single byte so need an explicit cast here
+        maintainer.newTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getNewTableEncodingScheme());
+        maintainer.newTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getNewTableImmutableStorageScheme());
+        maintainer.oldTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getOldTableEncodingScheme());
+        maintainer.oldTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getOldTableImmutableStorageScheme());
+
+        List<ServerCachingProtos.ColumnReference> oldTableColRefsForCoveredColumnsList = proto.getOldTableColRefForCoveredColumnsList();
+        List<ServerCachingProtos.ColumnReference> newTableColRefsForCoveredColumnsList = proto.getNewTableColRefForCoveredColumnsList();
+        maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(oldTableColRefsForCoveredColumnsList.size());
+        boolean encodedColumnNames = maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS;
+        Iterator<ServerCachingProtos.ColumnReference> newTableColRefItr = newTableColRefsForCoveredColumnsList.iterator();
+        for (ServerCachingProtos.ColumnReference colRefFromProto : oldTableColRefsForCoveredColumnsList) {
+            ColumnReference oldTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray());
+            ColumnReference newTableColRef;
+            if (encodedColumnNames) {
+                ServerCachingProtos.ColumnReference fromProto = newTableColRefItr.next();
+                newTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier().toByteArray());
+            } else {
+                byte[] cq = oldTableColRef.getQualifier();
+                byte[] cf = oldTableColRef.getFamily();
+                newTableColRef = new ColumnReference(cf, cq);
+            }
+            maintainer.coveredColumnsMap.put(oldTableColRef, newTableColRef);
+        }
+        maintainer.logicalNewTableName = proto.getLogicalNewTableName();
+        maintainer.initCachedState();
+        return maintainer;
+    }
+
+
+    public static List<IndexMaintainer> deserialize(byte[] buf) {
+        return deserialize(buf, 0, buf.length);
+    }
+
+    private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length) {
+        List<IndexMaintainer> maintainers = Collections.emptyList();
+        if (length > 0) {
+            ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length);
+            DataInput input = new DataInputStream(stream);
+            try {
+                int size = WritableUtils.readVInt(input);
+                boolean isDataTableSalted = size < 0;
+                size = Math.abs(size);
+                RowKeySchema rowKeySchema = new RowKeySchema();
+                rowKeySchema.readFields(input);
+                maintainers = Lists.newArrayListWithExpectedSize(size);
+                for (int i = 0; i < size; i++) {
+                    int protoSize = WritableUtils.readVInt(input);
+                    byte[] b = new byte[protoSize];
+                    input.readFully(b);
+                    ServerCachingProtos.TransformMaintainer proto = ServerCachingProtos.TransformMaintainer.parseFrom(b);
+                    maintainers.add(TransformMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+        }
+        return maintainers;
+    }
+
+    // Return new table's name
+    public byte[] getIndexTableName() {
+        return newTableName;
+    }
+
+    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts)  {

Review comment:
       nit: Javadoc would be really useful here. (Have to read the method to figure out that we're building a new table key from the old table key, rather than vice versa.) 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
##########
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexDBWritable;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
+import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
+import static org.apache.phoenix.util.QueryUtil.getConnection;
+
+public class TransformTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TransformTool.class);
+
+    public static enum MR_COUNTER_METRICS {
+        TRANSFORM_FAILED,
+        TRANSFORM_SUCCEED
+    }
+
+    private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true,
+            "Output path where the files are written");
+    private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true,
+            "Phoenix schema name (optional)");
+    private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true,
+            "Data table name (mandatory)");
+    private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true,
+            "Index table name(not required in case of partial rebuilding)");
+
+    private static final Option PARTIAL_TRANSFORM_OPTION = new Option("pt", "partial-transform", false,
+            "To transform a data table from a start timestamp");
+
+    private static final Option ABORT_TRANSFORM_OPTION = new Option("abort", "abort", false,
+            "Aborts the ongoing transform");
+
+    private static final Option PAUSE_TRANSFORM_OPTION = new Option("pause", "pause", false,
+            "Pauses the ongoing transform. If the ongoing transform fails, it will not be retried");
+
+    private static final Option RESUME_TRANSFORM_OPTION = new Option("resume", "resume", false,
+            "Resumes the ongoing transform");
+
+    private static final Option JOB_PRIORITY_OPTION = new Option("p", "job-priority", true,
+            "Define job priority from 0(highest) to 4. Default is 2(normal)");
+
+    private static final int DEFAULT_AUTOSPLIT_NUM_REGIONS = 20;
+
+    private static final Option AUTO_SPLIT_OPTION =
+            new Option("spa", "autosplit", true,
+                    "Automatically split the new table if the # of data table regions is greater than N. "
+                            + "Takes an optional argument specifying N, otherwise defaults to " + DEFAULT_AUTOSPLIT_NUM_REGIONS
+            );
+
+    private static final Option RUN_FOREGROUND_OPTION =
+            new Option(
+                    "runfg",
+                    "run-foreground",
+                    false,
+                    "If specified, runs transform in Foreground. Default - Runs the transform in background.");
+
+    private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true,
+            "If specified, uses Tenant connection for tenant index transform (optional)");
+
+    private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+    private static final Option START_TIME_OPTION = new Option("st", "start-time",
+            true, "Start time for transform");
+
+    private static final Option END_TIME_OPTION = new Option("et", "end-time",
+            true, "End time for transform");
+
+    public static final String TRANSFORM_JOB_NAME_TEMPLATE = "PHOENIX_TRANS_%s.%s";
+
+    public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial transform accepts "
+            + "non-zero ts set in the past as start-time(st) option and that ts must be present in SYSTEM.TRANSFORM table";
+
+    public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views or transactional tables";
+
+    public static final String PARTIAL_TRANSFORM_NOT_COMPATIBLE = "Can't abort/pause/resume/split during partial transform";
+
+    private Configuration configuration;
+    private Connection connection;
+    private String tenantId;
+    private String dataTable;
+    private String logicalParentName;
+    private String basePath;
+    // logicalTableName is index table and logicalParentName is the data table if this is an index transform
+    // If this is a data table transform, logicalParentName is null and logicalTableName is dataTable
+    private String logicalTableName;
+    private String schemaName;
+    private String indexTable;
+    private String qDataTable; //normalized with schema
+    private PTable pIndexTable = null;
+    private PTable pDataTable;
+    private PTable pOldTable;
+    private PTable pNewTable;
+
+    private String oldTableWithSchema;
+    private String newTableWithSchema;
+    private JobPriority jobPriority;
+    private String jobName;
+    private boolean isForeground;
+    private Long startTime, endTime, lastTransformTime;
+    private boolean isPartialTransform;
+    private Job job;
+
+    public Long getStartTime() {
+        return startTime;
+    }
+
+    public Long getEndTime() { return endTime; }
+
+    public CommandLine parseOptions(String[] args) {
+        final Options options = getOptions();
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: " + e.getMessage(),
+                    options);
+        }
+
+        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        this.jobPriority = getJobPriority(cmdLine);
+
+        boolean dataTableProvided = (cmdLine.hasOption(DATA_TABLE_OPTION.getOpt()));
+        if (!dataTableProvided) {
+            throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory parameter");
+        }
+
+        return cmdLine;
+    }
+
+    private Options getOptions() {
+        final Options options = new Options();
+        options.addOption(OUTPUT_PATH_OPTION);
+        options.addOption(SCHEMA_NAME_OPTION);
+        options.addOption(DATA_TABLE_OPTION);
+        options.addOption(INDEX_TABLE_OPTION);
+        options.addOption(TENANT_ID_OPTION);
+        options.addOption(HELP_OPTION);
+        options.addOption(JOB_PRIORITY_OPTION);
+        options.addOption(RUN_FOREGROUND_OPTION);
+        options.addOption(PARTIAL_TRANSFORM_OPTION);
+        options.addOption(START_TIME_OPTION);
+        options.addOption(END_TIME_OPTION);
+        options.addOption(AUTO_SPLIT_OPTION);
+        options.addOption(ABORT_TRANSFORM_OPTION);
+        options.addOption(PAUSE_TRANSFORM_OPTION);
+        options.addOption(RESUME_TRANSFORM_OPTION);
+        START_TIME_OPTION.setOptionalArg(true);
+        END_TIME_OPTION.setOptionalArg(true);
+        return options;
+    }
+
+    private void printHelpAndExit(String errorMessage, Options options) {
+        System.err.println(errorMessage);
+        LOGGER.error(errorMessage);
+        printHelpAndExit(options, 1);
+    }
+
+    private void printHelpAndExit(Options options, int exitCode) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("help", options);
+        System.exit(exitCode);
+    }
+
+    public CommandLine parseArgs(String[] args) throws Exception {
+        CommandLine cmdLine;
+        try {
+            cmdLine = parseOptions(args);
+        } catch (IllegalStateException e) {
+            printHelpAndExit(e.getMessage(), getOptions());
+            throw e;
+        }
+
+        if (getConf() == null) {
+            setConf(HBaseConfiguration.create());
+        }
+
+        return cmdLine;
+    }
+
+    @VisibleForTesting
+    public int populateTransformToolAttributesAndValidate(CommandLine cmdLine) throws Exception {
+        boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
+        boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
+        basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+        isPartialTransform = cmdLine.hasOption(PARTIAL_TRANSFORM_OPTION.getOpt());
+        if (useStartTime) {
+            startTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+        }
+
+        if (useEndTime) {
+            endTime = new Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt()));
+        }
+
+        if (isTimeRangeSet(startTime, endTime)) {
+            validateTimeRange(startTime, endTime);
+        }
+
+        if (isPartialTransform &&
+                (cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt()))) {
+            throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
+        }
+        if (isPartialTransform &&
+                (cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt()) || cmdLine.hasOption(PAUSE_TRANSFORM_OPTION.getOpt())
+                        || cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt()))) {
+            throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
+        }
+
+        if (isPartialTransform) {
+            if (!cmdLine.hasOption(START_TIME_OPTION.getOpt())) {
+                throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+            }
+            lastTransformTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+            SystemTransformRecord transformRecord = getTransformRecord(null);
+            if (transformRecord == null) {
+                throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+            }
+            if (lastTransformTime == null) {
+                lastTransformTime = transformRecord.getTransformEndTs().getTime();
+            } else {
+                validateLastTransformTime();
+            }
+        }
+
+        schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+        dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+        indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+        qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+        isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+        logicalTableName = dataTable;
+        logicalParentName = null;
+        if (!Strings.isNullOrEmpty(indexTable)) {
+            logicalTableName = indexTable;
+            logicalParentName = SchemaUtil.getTableName(schemaName, dataTable);
+        }
+
+        pDataTable = PhoenixRuntime.getTable(
+                connection, SchemaUtil.getQualifiedTableName(schemaName, dataTable));
+        if (indexTable != null) {
+            pIndexTable = PhoenixRuntime.getTable(
+                    connection, SchemaUtil.getQualifiedTableName(schemaName, indexTable));
+            pOldTable = pIndexTable;
+        } else {
+            pOldTable = pDataTable;
+        }
+
+        SystemTransformRecord transformRecord = getTransformRecord(connection.unwrap(PhoenixConnection.class));
+
+        validateTransform(pDataTable, pIndexTable, transformRecord);
+        String newTableName = SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
+        pNewTable = PhoenixRuntime.getTable(

Review comment:
       should this be getTableNoCache? Does it matter if our copy of PTable is slightly stale?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] virajjasani commented on a change in pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
virajjasani commented on a change in pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#discussion_r773012137



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -157,64 +307,70 @@ public static void addTransform(
     }
 
 
-    public static SystemTransformRecord getTransformRecord(
-            String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
-        try (ResultSet resultSet = connection.prepareStatement("SELECT " +
-                PhoenixDatabaseMetaData.TENANT_ID + ", " +
-                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
-                PhoenixDatabaseMetaData.OLD_METADATA + " , " +
-                PhoenixDatabaseMetaData.NEW_METADATA + " , " +
-                PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
-                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE  " +
-                (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
-                (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
-                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
-                (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
-        ).executeQuery()) {
-            if (resultSet.next()) {
-                return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+    public static void completeTransform(Connection connection, Configuration configuration) throws Exception{
+        // Will be called from Reducer
+        long timestmp= EnvironmentEdgeManager.currentTimeMillis();
+        String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
+        String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
+        String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
+        String oldTableLogicalName = SchemaUtil.getTableNameFromFullName(fullOldTableName);
+        String indexTableName = SchemaUtil.getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
+        String logicaTableName = oldTableLogicalName;
+        String logicalParentName = null;
+        if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE)
+            if (!Strings.isNullOrEmpty(indexTableName)) {
+                logicaTableName = indexTableName;
+                logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
             }
-            return null;
-        }
-    }
-
-    public static boolean checkIsTransformNeeded(MetaDataClient.MetaProperties metaProperties, String schemaName,
-                                                 PTable table, String logicalTableName, String parentTableName,
-                                                 String tenantId, PhoenixConnection connection) throws SQLException {
-        boolean isTransformNeeded = isTransformNeeded(metaProperties, table);
-        if (isTransformNeeded) {
-            SystemTransformRecord existingTransform = Transform.getTransformRecord(schemaName, logicalTableName, parentTableName, tenantId,connection);
-            if (existingTransform != null && existingTransform.isActive()) {
-                throw new SQLExceptionInfo.Builder(
-                        SQLExceptionCode.CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE)
-                        .setMessage(" Only one transform at a time is allowed ")
-                        .setSchemaName(schemaName).setTableName(logicalTableName).build().buildException();
+        boolean isPartial = PhoenixConfigurationUtil.getIsPartialTransform(configuration);
+        SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName, logicalParentName,
+                tenantId, connection.unwrap(PhoenixConnection.class));
+        if (!isPartial) {
+            String newTableName = SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
+            PTable pNewTable = PhoenixRuntime.getTable(connection, transformRecord.getNewPhysicalTableName());
+            PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(schemaName,logicaTableName));
+            if (pOldTable.getImmutableStorageScheme() != pNewTable.getImmutableStorageScheme() ||
+                    pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+                MetaDataClient.mutateTransformProperties(connection, tenantId, schemaName, logicaTableName, newTableName,
+                        pNewTable.getImmutableStorageScheme(), pNewTable.getEncodingScheme());
+                // We need to update the columns's qualifiers as well
+                if (pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+                    Short nextKeySeq = 0;

Review comment:
       nit: primitive `short`?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
##########
@@ -0,0 +1,762 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexDBWritable;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
+import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
+import static org.apache.phoenix.util.QueryUtil.getConnection;
+
+public class TransformTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TransformTool.class);
+
+    public static enum MR_COUNTER_METRICS {

Review comment:
       nit: static is redundant

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+    private boolean isMultiTenant;
+    // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
+    private List<Expression> newTableExpressions;
+    private Set<ColumnReference> newTableColumns;
+
+    private List<PDataType> newTableColumnTypes;
+    private int newTableColumnCount;
+    private byte[] newTableName;
+    private int nNewTableSaltBuckets;
+    private byte[] oldTableEmptyKeyValueCF;
+    private ImmutableBytesPtr emptyKeyValueCFPtr;
+    private int nOldTableCFs;
+    private boolean newTableWALDisabled;
+    private boolean newTableImmutableRows;
+    // Transient state
+    private final boolean isOldTableSalted;
+    private final RowKeySchema oldTableRowKeySchema;
+
+    private int estimatedNewTableRowKeyBytes;
+    private ColumnReference newTableEmptyKeyValueRef;
+    private ColumnReference oldTableEmptyKeyValueRef;
+    private boolean newTableRowKeyOrderOptimizable;
+
+    private PTable.QualifierEncodingScheme newTableEncodingScheme;
+    private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+    private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+    private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+    /*
+     * The first part of the pair is column family name
+     * and second part is the column name. The reason we need to track this state is because for certain storage schemes
+     * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an new
+     * table put/delete is different from the old columns in the phoenix schema.
+     */
+    private Set<Pair<String, String>> newTableColumnsInfo;
+    /*
+     * Map of covered columns where a key is column reference for a column in the data table
+     * and value is column reference for corresponding column in the new table.
+     */
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+    private String logicalNewTableName;
+
+    public static TransformMaintainer create(PTable oldTable, PTable newTable, PhoenixConnection connection) {
+        if (oldTable.getType() == PTableType.INDEX) {
+            throw new IllegalArgumentException();
+        }
+        TransformMaintainer maintainer = new TransformMaintainer(oldTable, newTable, connection);
+        return maintainer;
+    }
+
+    private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean isOldTableSalted) {
+        super(oldRowKeySchema, isOldTableSalted);
+        this.oldTableRowKeySchema = oldRowKeySchema;
+        this.isOldTableSalted = isOldTableSalted;
+    }
+
+    private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) {
+        this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+        this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable();
+        this.isMultiTenant = oldTable.isMultiTenant();
+
+        this.newTableEncodingScheme = newTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : newTable.getEncodingScheme();
+        this.newTableImmutableStorageScheme = newTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : newTable.getImmutableStorageScheme();
+        this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
+        this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
+
+        this.newTableName = newTable.getPhysicalName().getBytes();
+        boolean newTableWALDisabled = newTable.isWALDisabled();
+        int nNewTableColumns = newTable.getColumns().size();
+        int nNewTablePKColumns = newTable.getPKColumns().size();
+
+        List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+        this.newTableColumnCount = oldTablePKColumns.size();
+
+        this.newTableColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns);
+        this.newTableExpressions = Lists.newArrayListWithExpectedSize(nNewTableColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+        this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 : newTable.getBucketNum();
+        this.oldTableEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(oldTable);
+        this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable);
+        this.nOldTableCFs = oldTable.getColumnFamilies().size();
+        this.newTableWALDisabled = newTableWALDisabled;
+        this.newTableImmutableRows = newTable.isImmutableRows();
+        this.newTableColumnsInfo = Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+
+        for (int i = 0; i < newTable.getColumnFamilies().size(); i++) {
+            PColumnFamily family = newTable.getColumnFamilies().get(i);
+            for (PColumn newColumn : family.getColumns()) {
+                PColumn oldColumn = getColumnOrNull(oldTable, newColumn.getName().getString(), newColumn.getFamilyName().getString());
+                // This can happen during deletion where we don't need covered columns
+                if (oldColumn != null) {
+                    byte[] oldColumnCq = oldColumn.getColumnQualifierBytes();
+                    byte[] newColumnCq = newColumn.getColumnQualifierBytes();
+                    this.coveredColumnsMap.put(new ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq),
+                            new ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq));
+                }
+            }
+        }
+        this.logicalNewTableName = newTable.getName().getString();
+        initCachedState();
+    }
+
+    public static PColumn getColumnOrNull(PTable table, String columnName, String familyName) {
+        PColumnFamily family;
+        try {
+            family = table.getColumnFamily(familyName);
+        } catch (ColumnFamilyNotFoundException e) {
+            return null;
+        }
+        try {
+            return family.getPColumnForColumnName(columnName);
+        } catch (ColumnNotFoundException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Init calculated state reading/creating
+     */
+    private void initCachedState() {
+        byte[] newTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
+        byte[] oldTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
+        newTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);
+        oldTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier);

Review comment:
       Planning to use cached value of `oldTableEmptyKeyValueRef` in future?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -157,64 +307,70 @@ public static void addTransform(
     }
 
 
-    public static SystemTransformRecord getTransformRecord(
-            String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
-        try (ResultSet resultSet = connection.prepareStatement("SELECT " +
-                PhoenixDatabaseMetaData.TENANT_ID + ", " +
-                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
-                PhoenixDatabaseMetaData.OLD_METADATA + " , " +
-                PhoenixDatabaseMetaData.NEW_METADATA + " , " +
-                PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
-                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE  " +
-                (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
-                (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
-                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
-                (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
-        ).executeQuery()) {
-            if (resultSet.next()) {
-                return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+    public static void completeTransform(Connection connection, Configuration configuration) throws Exception{
+        // Will be called from Reducer
+        long timestmp= EnvironmentEdgeManager.currentTimeMillis();

Review comment:
       nit: `:s/timestmp/timestamp/`

##########
File path: phoenix-core/src/main/protobuf/ServerCachingService.proto
##########
@@ -70,6 +70,31 @@ message IndexMaintainer {
   optional int32 dataImmutableStorageScheme = 27;
 }
 
+message TransformMaintainer {
+  required int32 saltBuckets = 1;
+  required bool isMultiTenant = 2;
+  repeated ColumnReference newTableColumns = 3;
+  repeated ColumnReference oldTableColRefForCoveredColumns = 4;
+  repeated ColumnReference newTableColRefForCoveredColumns = 5;
+  required bytes newTableName = 6;
+  required bool newTableRowKeyOrderOptimizable = 8;

Review comment:
       `7` seems skipped?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+    private boolean isMultiTenant;
+    // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
+    private List<Expression> newTableExpressions;
+    private Set<ColumnReference> newTableColumns;
+
+    private List<PDataType> newTableColumnTypes;
+    private int newTableColumnCount;
+    private byte[] newTableName;
+    private int nNewTableSaltBuckets;
+    private byte[] oldTableEmptyKeyValueCF;
+    private ImmutableBytesPtr emptyKeyValueCFPtr;
+    private int nOldTableCFs;
+    private boolean newTableWALDisabled;
+    private boolean newTableImmutableRows;
+    // Transient state
+    private final boolean isOldTableSalted;
+    private final RowKeySchema oldTableRowKeySchema;
+
+    private int estimatedNewTableRowKeyBytes;
+    private ColumnReference newTableEmptyKeyValueRef;
+    private ColumnReference oldTableEmptyKeyValueRef;
+    private boolean newTableRowKeyOrderOptimizable;
+
+    private PTable.QualifierEncodingScheme newTableEncodingScheme;
+    private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+    private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+    private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+    /*
+     * The first part of the pair is column family name
+     * and second part is the column name. The reason we need to track this state is because for certain storage schemes
+     * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an new
+     * table put/delete is different from the old columns in the phoenix schema.
+     */
+    private Set<Pair<String, String>> newTableColumnsInfo;
+    /*
+     * Map of covered columns where a key is column reference for a column in the data table
+     * and value is column reference for corresponding column in the new table.
+     */
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+    private String logicalNewTableName;
+
+    public static TransformMaintainer create(PTable oldTable, PTable newTable, PhoenixConnection connection) {
+        if (oldTable.getType() == PTableType.INDEX) {
+            throw new IllegalArgumentException();
+        }
+        TransformMaintainer maintainer = new TransformMaintainer(oldTable, newTable, connection);
+        return maintainer;
+    }
+
+    private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean isOldTableSalted) {
+        super(oldRowKeySchema, isOldTableSalted);
+        this.oldTableRowKeySchema = oldRowKeySchema;
+        this.isOldTableSalted = isOldTableSalted;
+    }
+
+    private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) {
+        this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+        this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable();
+        this.isMultiTenant = oldTable.isMultiTenant();
+
+        this.newTableEncodingScheme = newTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : newTable.getEncodingScheme();
+        this.newTableImmutableStorageScheme = newTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : newTable.getImmutableStorageScheme();
+        this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
+        this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
+
+        this.newTableName = newTable.getPhysicalName().getBytes();
+        boolean newTableWALDisabled = newTable.isWALDisabled();
+        int nNewTableColumns = newTable.getColumns().size();
+        int nNewTablePKColumns = newTable.getPKColumns().size();
+
+        List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+        this.newTableColumnCount = oldTablePKColumns.size();
+
+        this.newTableColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns);
+        this.newTableExpressions = Lists.newArrayListWithExpectedSize(nNewTableColumns);

Review comment:
       nit: remove type arg from `newTableColumnTypes` as well (similar to `newTableExpressions`)?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+    private boolean isMultiTenant;
+    // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
+    private List<Expression> newTableExpressions;
+    private Set<ColumnReference> newTableColumns;
+
+    private List<PDataType> newTableColumnTypes;
+    private int newTableColumnCount;
+    private byte[] newTableName;
+    private int nNewTableSaltBuckets;
+    private byte[] oldTableEmptyKeyValueCF;
+    private ImmutableBytesPtr emptyKeyValueCFPtr;
+    private int nOldTableCFs;
+    private boolean newTableWALDisabled;
+    private boolean newTableImmutableRows;
+    // Transient state
+    private final boolean isOldTableSalted;
+    private final RowKeySchema oldTableRowKeySchema;
+
+    private int estimatedNewTableRowKeyBytes;
+    private ColumnReference newTableEmptyKeyValueRef;
+    private ColumnReference oldTableEmptyKeyValueRef;
+    private boolean newTableRowKeyOrderOptimizable;
+
+    private PTable.QualifierEncodingScheme newTableEncodingScheme;
+    private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+    private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+    private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+    /*
+     * The first part of the pair is column family name
+     * and second part is the column name. The reason we need to track this state is because for certain storage schemes
+     * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an new
+     * table put/delete is different from the old columns in the phoenix schema.
+     */
+    private Set<Pair<String, String>> newTableColumnsInfo;
+    /*
+     * Map of covered columns where a key is column reference for a column in the data table
+     * and value is column reference for corresponding column in the new table.
+     */
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+    private String logicalNewTableName;
+
+    public static TransformMaintainer create(PTable oldTable, PTable newTable, PhoenixConnection connection) {
+        if (oldTable.getType() == PTableType.INDEX) {
+            throw new IllegalArgumentException();
+        }
+        TransformMaintainer maintainer = new TransformMaintainer(oldTable, newTable, connection);
+        return maintainer;
+    }
+
+    private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean isOldTableSalted) {
+        super(oldRowKeySchema, isOldTableSalted);
+        this.oldTableRowKeySchema = oldRowKeySchema;
+        this.isOldTableSalted = isOldTableSalted;
+    }
+
+    private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) {
+        this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+        this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable();
+        this.isMultiTenant = oldTable.isMultiTenant();
+
+        this.newTableEncodingScheme = newTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : newTable.getEncodingScheme();
+        this.newTableImmutableStorageScheme = newTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : newTable.getImmutableStorageScheme();
+        this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
+        this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
+
+        this.newTableName = newTable.getPhysicalName().getBytes();
+        boolean newTableWALDisabled = newTable.isWALDisabled();
+        int nNewTableColumns = newTable.getColumns().size();
+        int nNewTablePKColumns = newTable.getPKColumns().size();
+
+        List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+        this.newTableColumnCount = oldTablePKColumns.size();
+
+        this.newTableColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns);
+        this.newTableExpressions = Lists.newArrayListWithExpectedSize(nNewTableColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+        this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 : newTable.getBucketNum();
+        this.oldTableEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(oldTable);
+        this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable);
+        this.nOldTableCFs = oldTable.getColumnFamilies().size();
+        this.newTableWALDisabled = newTableWALDisabled;
+        this.newTableImmutableRows = newTable.isImmutableRows();
+        this.newTableColumnsInfo = Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+
+        for (int i = 0; i < newTable.getColumnFamilies().size(); i++) {
+            PColumnFamily family = newTable.getColumnFamilies().get(i);
+            for (PColumn newColumn : family.getColumns()) {
+                PColumn oldColumn = getColumnOrNull(oldTable, newColumn.getName().getString(), newColumn.getFamilyName().getString());
+                // This can happen during deletion where we don't need covered columns
+                if (oldColumn != null) {
+                    byte[] oldColumnCq = oldColumn.getColumnQualifierBytes();
+                    byte[] newColumnCq = newColumn.getColumnQualifierBytes();
+                    this.coveredColumnsMap.put(new ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq),
+                            new ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq));
+                }
+            }
+        }
+        this.logicalNewTableName = newTable.getName().getString();
+        initCachedState();
+    }
+
+    public static PColumn getColumnOrNull(PTable table, String columnName, String familyName) {
+        PColumnFamily family;
+        try {
+            family = table.getColumnFamily(familyName);
+        } catch (ColumnFamilyNotFoundException e) {
+            return null;
+        }
+        try {
+            return family.getPColumnForColumnName(columnName);
+        } catch (ColumnNotFoundException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Init calculated state reading/creating
+     */
+    private void initCachedState() {
+        byte[] newTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
+        byte[] oldTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
+        newTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);
+        oldTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier);
+        this.newTableColumns = Sets.newLinkedHashSetWithExpectedSize(this.newTableColumnCount);
+
+        for (ColumnReference colRef : coveredColumnsMap.keySet()) {
+            if (newTableImmutableStorageScheme == PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+                newTableColumns.add(colRef);
+            } else {
+                newTableColumns.add(new ColumnReference(colRef.getFamily(), QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
+            }
+        }
+    }
+
+    /**
+     * For client-side to serialize TransformMaintainer for a given table
+     *
+     * @param oldTable old table
+     * @param ptr      bytes pointer to hold returned serialized value
+     * @param newTable new table to serialize
+     */
+    public static void serialize(PTable oldTable, ImmutableBytesWritable ptr,
+                                 PTable newTable, PhoenixConnection connection) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(stream);
+        try {
+            // Encode data table salting
+            WritableUtils.writeVInt(output, oldTable.getBucketNum() == null ? 1 : -1);
+            // Write out data row key schema once, since it's the same
+            oldTable.getRowKeySchema().write(output);
+            org.apache.phoenix.coprocessor.generated.ServerCachingProtos.TransformMaintainer proto =
+                    TransformMaintainer.toProto(newTable.getTransformMaintainer(oldTable, connection));
+            byte[] protoBytes = proto.toByteArray();
+            WritableUtils.writeVInt(output, protoBytes.length);
+            output.write(protoBytes);
+
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+        ptr.set(stream.toByteArray(), 0, stream.size());
+    }
+
+    public static ServerCachingProtos.TransformMaintainer toProto(TransformMaintainer maintainer) throws IOException {
+        ServerCachingProtos.TransformMaintainer.Builder builder = ServerCachingProtos.TransformMaintainer.newBuilder();
+        builder.setSaltBuckets(maintainer.nNewTableSaltBuckets);
+        builder.setIsMultiTenant(maintainer.isMultiTenant);
+
+        for (ColumnReference colRef : maintainer.newTableColumns) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+            cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier()));
+            builder.addNewTableColumns(cRefBuilder.build());
+        }
+
+        for (Map.Entry<ColumnReference, ColumnReference> e : maintainer.coveredColumnsMap.entrySet()) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+            ColumnReference dataTableColRef = e.getKey();
+            cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
+            builder.addOldTableColRefForCoveredColumns(cRefBuilder.build());
+            if (maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS) {
+                // We need to serialize the colRefs of new tables only in case of encoded column names.
+                ColumnReference newTableColRef = e.getValue();
+                cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+                cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
+                cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
+                builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
+            }
+        }
+
+        builder.setNewTableColumnCount(maintainer.newTableColumnCount);
+        builder.setNewTableName(ByteStringer.wrap(maintainer.newTableName));
+        builder.setNewTableRowKeyOrderOptimizable(maintainer.newTableRowKeyOrderOptimizable);
+        builder.setOldTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.oldTableEmptyKeyValueCF));
+        ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder = ServerCachingProtos.ImmutableBytesWritable.newBuilder();
+        ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get()));
+        ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength());
+        ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset());
+        builder.setEmptyKeyValueColFamily(ibwBuilder.build());
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+            DataOutput output = new DataOutputStream(stream);
+            for (Expression expression : maintainer.newTableExpressions) {
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+                expression.write(output);
+            }
+            builder.setNewTableExpressions(ByteStringer.wrap(stream.toByteArray()));
+        }
+
+        builder.setNumDataTableColFamilies(maintainer.nOldTableCFs);
+        builder.setNewTableWalDisabled(maintainer.newTableWALDisabled);
+        builder.setNewTableRowKeyByteSize(maintainer.estimatedNewTableRowKeyBytes);
+        builder.setNewTableImmutable(maintainer.newTableImmutableRows);
+        for (Pair<String, String> p : maintainer.newTableColumnsInfo) {
+            ServerCachingProtos.ColumnInfo.Builder ciBuilder = ServerCachingProtos.ColumnInfo.newBuilder();
+            if (p.getFirst() != null) {
+                ciBuilder.setFamilyName(p.getFirst());
+            }
+            ciBuilder.setColumnName(p.getSecond());
+            builder.addNewTableColumnInfo(ciBuilder.build());
+        }
+        builder.setNewTableEncodingScheme(maintainer.newTableEncodingScheme.getSerializedMetadataValue());
+        builder.setNewTableImmutableStorageScheme(maintainer.newTableImmutableStorageScheme.getSerializedMetadataValue());
+        builder.setLogicalNewTableName(maintainer.logicalNewTableName);
+        builder.setOldTableEncodingScheme(maintainer.oldTableEncodingScheme.getSerializedMetadataValue());
+        builder.setOldTableImmutableStorageScheme(maintainer.oldTableImmutableStorageScheme.getSerializedMetadataValue());
+        return builder.build();
+    }
+
+    public static TransformMaintainer fromProto(ServerCachingProtos.TransformMaintainer proto, RowKeySchema dataTableRowKeySchema, boolean isDataTableSalted) throws IOException {
+        TransformMaintainer maintainer = new TransformMaintainer(dataTableRowKeySchema, isDataTableSalted);
+        maintainer.nNewTableSaltBuckets = proto.getSaltBuckets();
+        maintainer.isMultiTenant = proto.getIsMultiTenant();
+        List<ServerCachingProtos.ColumnReference> newTableColList = proto.getNewTableColumnsList();
+        maintainer.newTableColumns = new HashSet<ColumnReference>(newTableColList.size());
+        for (ServerCachingProtos.ColumnReference colRefFromProto : newTableColList) {
+            maintainer.newTableColumns.add(new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray()));
+        }
+
+        maintainer.newTableName = proto.getNewTableName().toByteArray();
+        if (proto.getNewTableColumnCount() != -1) {
+            maintainer.newTableColumnCount = proto.getNewTableColumnCount();
+        }
+
+        maintainer.newTableRowKeyOrderOptimizable = proto.getNewTableRowKeyOrderOptimizable();
+        maintainer.oldTableEmptyKeyValueCF = proto.getOldTableEmptyKeyValueColFamily().toByteArray();
+        ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily = proto.getEmptyKeyValueColFamily();
+        maintainer.emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(), emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength());
+
+        maintainer.nOldTableCFs = proto.getNumDataTableColFamilies();
+        maintainer.newTableWALDisabled = proto.getNewTableWalDisabled();
+        maintainer.estimatedNewTableRowKeyBytes = proto.getNewTableRowKeyByteSize();
+        maintainer.newTableImmutableRows = proto.getNewTableImmutable();
+        List<ServerCachingProtos.ColumnInfo> newTblColumnInfoList = proto.getNewTableColumnInfoList();
+        maintainer.newTableColumnsInfo = Sets.newHashSet();
+        for (ServerCachingProtos.ColumnInfo info : newTblColumnInfoList) {
+            maintainer.newTableColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName()));
+        }
+        // proto doesn't support single byte so need an explicit cast here
+        maintainer.newTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getNewTableEncodingScheme());
+        maintainer.newTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getNewTableImmutableStorageScheme());
+        maintainer.oldTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getOldTableEncodingScheme());
+        maintainer.oldTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getOldTableImmutableStorageScheme());
+
+        List<ServerCachingProtos.ColumnReference> oldTableColRefsForCoveredColumnsList = proto.getOldTableColRefForCoveredColumnsList();
+        List<ServerCachingProtos.ColumnReference> newTableColRefsForCoveredColumnsList = proto.getNewTableColRefForCoveredColumnsList();
+        maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(oldTableColRefsForCoveredColumnsList.size());
+        boolean encodedColumnNames = maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS;
+        Iterator<ServerCachingProtos.ColumnReference> newTableColRefItr = newTableColRefsForCoveredColumnsList.iterator();
+        for (ServerCachingProtos.ColumnReference colRefFromProto : oldTableColRefsForCoveredColumnsList) {
+            ColumnReference oldTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray());
+            ColumnReference newTableColRef;
+            if (encodedColumnNames) {
+                ServerCachingProtos.ColumnReference fromProto = newTableColRefItr.next();
+                newTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier().toByteArray());
+            } else {
+                byte[] cq = oldTableColRef.getQualifier();
+                byte[] cf = oldTableColRef.getFamily();
+                newTableColRef = new ColumnReference(cf, cq);
+            }
+            maintainer.coveredColumnsMap.put(oldTableColRef, newTableColRef);
+        }
+        maintainer.logicalNewTableName = proto.getLogicalNewTableName();
+        maintainer.initCachedState();
+        return maintainer;
+    }
+
+
+    public static List<IndexMaintainer> deserialize(byte[] buf) {
+        return deserialize(buf, 0, buf.length);
+    }
+
+    private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length) {
+        List<IndexMaintainer> maintainers = Collections.emptyList();
+        if (length > 0) {
+            ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length);
+            DataInput input = new DataInputStream(stream);
+            try {
+                int size = WritableUtils.readVInt(input);
+                boolean isDataTableSalted = size < 0;
+                size = Math.abs(size);
+                RowKeySchema rowKeySchema = new RowKeySchema();
+                rowKeySchema.readFields(input);
+                maintainers = Lists.newArrayListWithExpectedSize(size);
+                for (int i = 0; i < size; i++) {
+                    int protoSize = WritableUtils.readVInt(input);
+                    byte[] b = new byte[protoSize];
+                    input.readFully(b);
+                    ServerCachingProtos.TransformMaintainer proto = ServerCachingProtos.TransformMaintainer.parseFrom(b);
+                    maintainers.add(TransformMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+        }
+        return maintainers;
+    }
+
+    // Return new table's name
+    public byte[] getIndexTableName() {
+        return newTableName;
+    }
+
+    // Builds new table's rowkey using the old table's rowkey.
+    // This method will change when we support rowkey related transforms
+    public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts)  {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        boolean isNewTableSalted = nNewTableSaltBuckets > 0;
+        TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedNewTableRowKeyBytes);
+        DataOutput output = new DataOutputStream(stream);
+
+        try {
+            if (isNewTableSalted) {
+                output.write(0); // will be set at end to new table salt byte
+            }
+            // The oldTableRowKeySchema includes the salt byte field,
+            // so we must adjust for that here.
+            int dataPosOffset = isOldTableSalted ? 1 : 0 ;
+            //BitSet viewConstantColumnBitSet = this.rowKeyMetaData.getViewConstantColumnBitSet();
+            // Skip data table salt byte
+            int maxRowKeyOffset = rowKeyPtr.getOffset() + rowKeyPtr.getLength();
+            oldTableRowKeySchema.iterator(rowKeyPtr, ptr, dataPosOffset);
+
+            // Write new table row key
+            while (oldTableRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset) != null) {
+                output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
+                if (!oldTableRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
+                    output.writeByte(SchemaUtil.getSeparatorByte(newTableRowKeyOrderOptimizable, ptr.getLength()==0
+                            , oldTableRowKeySchema.getField(dataPosOffset)));
+                }
+                dataPosOffset++;
+            }
+
+            byte[] newTableRowKey = stream.getBuffer();
+            // Remove trailing nulls
+            int length = stream.size();
+            if (isNewTableSalted) {
+                // Set salt byte
+                byte saltByte = SaltingUtil.getSaltingByte(newTableRowKey, SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES, nNewTableSaltBuckets);
+                newTableRowKey[0] = saltByte;
+            }
+            return newTableRowKey.length == length ? newTableRowKey : Arrays.copyOf(newTableRowKey, length);
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible

Review comment:
       Good to wrap `stream` in try-with-resource to avoid having to handle such possibilities here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni closed pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
gokceni closed pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on a change in pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
gokceni commented on a change in pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#discussion_r770897352



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -71,14 +87,150 @@ public static void addTransform(PhoenixConnection connection, String tenantId, P
                 newPhysicalTableName = generateNewTableName(schema, logicalTableName, sequenceNum);
             }
             transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
-            Transform.addTransform(transformBuilder.build(), connection);
+            Transform.addTransform(table, changingProperties, transformBuilder.build(), connection);
         } catch (JsonProcessingException ex) {
             LOGGER.error("addTransform failed", ex);
             throw new SQLException("Adding transform failed with JsonProcessingException");
+        } catch (SQLException ex) {
+            throw ex;
+        } catch(Exception ex) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.valueOf("CANNOT_MUTATE_TABLE"))
+                    .setSchemaName((table.getSchemaName() == null? null: table.getSchemaName().getString()))
+                    .setRootCause(ex)
+                    .setTableName(table.getName().getString()).build().buildException();
         }
     }
 
-    public static void addTransform(
+    protected static void addTransform(
+            PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws Exception {
+        PName newTableName = PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
+        PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
+        PTable newTable = new PTableImpl.Builder()
+                .setTableName(newTableNameWithoutSchema)
+                .setParentTableName(table.getParentTableName())
+                .setBaseTableLogicalName(table.getBaseTableLogicalName())
+                .setPhysicalTableName(newTableNameWithoutSchema)
+                .setAllColumns(table.getColumns())
+                .setAppendOnlySchema(table.isAppendOnlySchema())
+                .setAutoPartitionSeqName(table.getAutoPartitionSeqName())
+                .setBaseColumnCount(table.getBaseColumnCount())
+                .setBucketNum(table.getBucketNum())
+                .setDefaultFamilyName(table.getDefaultFamilyName())
+                .setDisableWAL(table.isWALDisabled())
+                .setEstimatedSize(table.getEstimatedSize())
+                .setFamilies(table.getColumnFamilies())
+                .setImmutableRows(table.isImmutableRows())
+                .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+                .setIndexType(table.getIndexType())
+                .setName(newTableName)
+                .setMultiTenant(table.isMultiTenant())
+                .setParentName(table.getParentName())
+                .setParentSchemaName(table.getParentSchemaName())
+                .setPhoenixTTL(table.getPhoenixTTL())
+                .setNamespaceMapped(table.isNamespaceMapped())
+                .setSchemaName(table.getSchemaName())
+                .setPkColumns(table.getPKColumns())
+                .setPkName(table.getPKName())
+                .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
+                .setRowKeySchema(table.getRowKeySchema())
+                .setStoreNulls(table.getStoreNulls())
+                .setTenantId(table.getTenantId())
+                .setType(table.getType())
+                // SchemaExtractor uses physical name to get the table descriptor from. So we use the existing table here
+                .setPhysicalNames(ImmutableList.copyOf(table.getPhysicalNames()))
+                .setUpdateCacheFrequency(table.getUpdateCacheFrequency())
+                .setTransactionProvider(table.getTransactionProvider())
+                .setUseStatsForParallelization(table.useStatsForParallelization())
+                // TODO SET SCHEMAVERSION
+                // Transformables
+                .setImmutableStorageScheme(
+                        (changedProps.getImmutableStorageSchemeProp() != null? changedProps.getImmutableStorageSchemeProp():table.getImmutableStorageScheme()))
+                .setQualifierEncodingScheme(
+                        (changedProps.getColumnEncodedBytesProp() != null? changedProps.getColumnEncodedBytesProp() : table.getEncodingScheme()))
+                .build();

Review comment:
       Noted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
gokceni commented on pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#issuecomment-999143621


   merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on a change in pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
gokceni commented on a change in pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#discussion_r773370346



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -157,64 +307,70 @@ public static void addTransform(
     }
 
 
-    public static SystemTransformRecord getTransformRecord(
-            String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
-        try (ResultSet resultSet = connection.prepareStatement("SELECT " +
-                PhoenixDatabaseMetaData.TENANT_ID + ", " +
-                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
-                PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
-                PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
-                PhoenixDatabaseMetaData.OLD_METADATA + " , " +
-                PhoenixDatabaseMetaData.NEW_METADATA + " , " +
-                PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
-                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE  " +
-                (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
-                (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
-                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
-                (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
-        ).executeQuery()) {
-            if (resultSet.next()) {
-                return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+    public static void completeTransform(Connection connection, Configuration configuration) throws Exception{
+        // Will be called from Reducer
+        long timestmp= EnvironmentEdgeManager.currentTimeMillis();
+        String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
+        String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
+        String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
+        String oldTableLogicalName = SchemaUtil.getTableNameFromFullName(fullOldTableName);
+        String indexTableName = SchemaUtil.getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
+        String logicaTableName = oldTableLogicalName;
+        String logicalParentName = null;
+        if (PhoenixConfigurationUtil.getTransformingTableType(configuration) == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE)
+            if (!Strings.isNullOrEmpty(indexTableName)) {
+                logicaTableName = indexTableName;
+                logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
             }
-            return null;
-        }
-    }
-
-    public static boolean checkIsTransformNeeded(MetaDataClient.MetaProperties metaProperties, String schemaName,
-                                                 PTable table, String logicalTableName, String parentTableName,
-                                                 String tenantId, PhoenixConnection connection) throws SQLException {
-        boolean isTransformNeeded = isTransformNeeded(metaProperties, table);
-        if (isTransformNeeded) {
-            SystemTransformRecord existingTransform = Transform.getTransformRecord(schemaName, logicalTableName, parentTableName, tenantId,connection);
-            if (existingTransform != null && existingTransform.isActive()) {
-                throw new SQLExceptionInfo.Builder(
-                        SQLExceptionCode.CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE)
-                        .setMessage(" Only one transform at a time is allowed ")
-                        .setSchemaName(schemaName).setTableName(logicalTableName).build().buildException();
+        boolean isPartial = PhoenixConfigurationUtil.getIsPartialTransform(configuration);
+        SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName, logicalParentName,
+                tenantId, connection.unwrap(PhoenixConnection.class));
+        if (!isPartial) {
+            String newTableName = SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
+            PTable pNewTable = PhoenixRuntime.getTable(connection, transformRecord.getNewPhysicalTableName());
+            PTable pOldTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(schemaName,logicaTableName));
+            if (pOldTable.getImmutableStorageScheme() != pNewTable.getImmutableStorageScheme() ||
+                    pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+                MetaDataClient.mutateTransformProperties(connection, tenantId, schemaName, logicaTableName, newTableName,
+                        pNewTable.getImmutableStorageScheme(), pNewTable.getEncodingScheme());
+                // We need to update the columns's qualifiers as well
+                if (pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+                    Short nextKeySeq = 0;

Review comment:
       addColumnMutation checks for null value for keySeq.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gokceni commented on a change in pull request #1366: PHOENIX-6612 Add TransformTool

Posted by GitBox <gi...@apache.org>.
gokceni commented on a change in pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#discussion_r770881888



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -71,14 +87,150 @@ public static void addTransform(PhoenixConnection connection, String tenantId, P
                 newPhysicalTableName = generateNewTableName(schema, logicalTableName, sequenceNum);
             }
             transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
-            Transform.addTransform(transformBuilder.build(), connection);
+            Transform.addTransform(table, changingProperties, transformBuilder.build(), connection);
         } catch (JsonProcessingException ex) {
             LOGGER.error("addTransform failed", ex);
             throw new SQLException("Adding transform failed with JsonProcessingException");
+        } catch (SQLException ex) {
+            throw ex;
+        } catch(Exception ex) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.valueOf("CANNOT_MUTATE_TABLE"))
+                    .setSchemaName((table.getSchemaName() == null? null: table.getSchemaName().getString()))
+                    .setRootCause(ex)
+                    .setTableName(table.getName().getString()).build().buildException();
         }
     }
 
-    public static void addTransform(
+    protected static void addTransform(
+            PTable table, MetaDataClient.MetaProperties changedProps, SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws Exception {
+        PName newTableName = PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
+        PName newTableNameWithoutSchema = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
+        PTable newTable = new PTableImpl.Builder()
+                .setTableName(newTableNameWithoutSchema)
+                .setParentTableName(table.getParentTableName())
+                .setBaseTableLogicalName(table.getBaseTableLogicalName())
+                .setPhysicalTableName(newTableNameWithoutSchema)
+                .setAllColumns(table.getColumns())
+                .setAppendOnlySchema(table.isAppendOnlySchema())
+                .setAutoPartitionSeqName(table.getAutoPartitionSeqName())
+                .setBaseColumnCount(table.getBaseColumnCount())
+                .setBucketNum(table.getBucketNum())
+                .setDefaultFamilyName(table.getDefaultFamilyName())
+                .setDisableWAL(table.isWALDisabled())
+                .setEstimatedSize(table.getEstimatedSize())
+                .setFamilies(table.getColumnFamilies())
+                .setImmutableRows(table.isImmutableRows())
+                .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+                .setIndexType(table.getIndexType())
+                .setName(newTableName)
+                .setMultiTenant(table.isMultiTenant())
+                .setParentName(table.getParentName())
+                .setParentSchemaName(table.getParentSchemaName())
+                .setPhoenixTTL(table.getPhoenixTTL())
+                .setNamespaceMapped(table.isNamespaceMapped())
+                .setSchemaName(table.getSchemaName())
+                .setPkColumns(table.getPKColumns())
+                .setPkName(table.getPKName())
+                .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
+                .setRowKeySchema(table.getRowKeySchema())
+                .setStoreNulls(table.getStoreNulls())
+                .setTenantId(table.getTenantId())
+                .setType(table.getType())
+                // SchemaExtractor uses physical name to get the table descriptor from. So we use the existing table here
+                .setPhysicalNames(ImmutableList.copyOf(table.getPhysicalNames()))
+                .setUpdateCacheFrequency(table.getUpdateCacheFrequency())
+                .setTransactionProvider(table.getTransactionProvider())
+                .setUseStatsForParallelization(table.useStatsForParallelization())
+                // TODO SET SCHEMAVERSION

Review comment:
       Thanks for reminding. Yes, I can implement this now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org