You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2021/12/09 15:10:57 UTC

[GitHub] [phoenix] virajjasani commented on a change in pull request #1363: PHOENIX-6603: Add SYSTEM.TRANSFORM table (#656)

virajjasani commented on a change in pull request #1363:
URL: https://github.com/apache/phoenix/pull/1363#discussion_r765825430



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
##########
@@ -425,7 +425,7 @@ private Connection getTenantConnection(String tenantId) throws Exception {
         return DriverManager.getConnection(getUrl(), tenantProps);
     }
 
-    private void assertMetadata(Connection conn, PTable.ImmutableStorageScheme expectedStorageScheme, PTable.QualifierEncodingScheme
+    public static void assertMetadata(Connection conn, PTable.ImmutableStorageScheme expectedStorageScheme, PTable.QualifierEncodingScheme

Review comment:
       We can move this to `BaseTest`

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
##########
@@ -476,7 +476,7 @@ public static void dumpTable(String tableName) throws Exception {
                             .entrySet()) {
                         byte[] family = entryF.getKey();

Review comment:
       Not related to this change, but looks like we are not using `family` anywhere? Unless we want to log family, we can remove for loop. But no strong opinion, it's upto you :)

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
##########
@@ -408,6 +422,12 @@
     public static final String SYSTEM_TASK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TASK_TABLE);
     public static final byte[] SYSTEM_TASK_NAME_BYTES = Bytes.toBytes(SYSTEM_TASK_NAME);
     public static final TableName SYSTEM_TASK_HBASE_TABLE_NAME = TableName.valueOf(SYSTEM_TASK_NAME);
+
+    public static final String SYSTEM_TRANSFORM_TABLE = "TRANSFORM";
+    public static final String SYSTEM_TRANSFORM_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TRANSFORM_TABLE);
+    public static final byte[] SYSTEM_TRANSFORM_NAME_BYTES = Bytes.toBytes(SYSTEM_TRANSFORM_NAME);
+    public static final TableName SYSTEM_TRANSFORM_HBASE_TABLE_NAME = TableName.valueOf(SYSTEM_TRANSFORM_NAME);

Review comment:
       Looks like they are not being used?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import static org.apache.phoenix.schema.PTableType.INDEX;
+
+public class Transform {
+    public static final Logger LOGGER = LoggerFactory.getLogger(Transform.class);

Review comment:
       nit: private?

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static org.apache.phoenix.end2end.index.SingleCellIndexIT.assertMetadata;
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TransformIT extends ParallelStatsDisabledIT {
+    private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);

Review comment:
       nit: can be final?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
##########
@@ -4354,6 +4360,18 @@ private PhoenixConnection upgradeSystemTask(
         return metaConnection;
     }
 
+    private PhoenixConnection upgradeSystemTransform(
+            PhoenixConnection metaConnection,
+            Map<String, String> systemTableToSnapshotMap)
+            throws SQLException, IOException {

Review comment:
       nit: IOException can be removed

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
##########
@@ -247,6 +247,58 @@ public String toString() {
         },
     }
 
+    public enum TransformType {
+        METADATA_TRANSFORM((byte)1);
+
+        private final byte[] byteValue;
+        private final int serializedValue;
+
+        TransformType(int serializedValue) {
+            this.serializedValue = serializedValue;
+            this.byteValue = Bytes.toBytes(this.name());
+        }
+
+        public byte[] getBytes() {
+            return byteValue;
+        }
+
+        public int getSerializedValue() {
+            return this.serializedValue;
+        }
+        public static TransformType getDefault() {
+            return METADATA_TRANSFORM;
+        }
+        public static TransformType fromSerializedValue(int serializedValue) {
+            if (serializedValue < 1 || serializedValue > TaskType.values().length) {

Review comment:
       Should this be `serializedValue > TransformType.values().length` instead?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+/**
+ * Task params to be used while upserting records in SYSTEM.TRANSFORM table.
+ * This POJO is mainly used while upserting(and committing) or generating
+ * upsert mutations plan in {@link Transform} class
+ */
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+    value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
+    justification = "endTs and startTs are not used for mutation")
+public class SystemTransformRecord {
+    private final PTable.TransformType transformType;
+    private final String schemaName;
+    private final String logicalTableName;
+    private final String tenantId;
+    private final String logicalParentName;
+    private final String newPhysicalTableName;
+    private final String transformStatus;
+    private final String transformJobId;
+    private final Integer transformRetryCount;
+    private final Timestamp startTs;
+    private final Timestamp endTs;
+    private final String oldMetadata;
+    private final String newMetadata;
+    private final String transformFunction;
+
+    public SystemTransformRecord(PTable.TransformType transformType,
+                                 String schemaName, String logicalTableName, String tenantId, String newPhysicalTableName, String logicalParentName,
+                                 String transformStatus, String transformJobId, Integer transformRetryCount, Timestamp startTs,
+                                 Timestamp endTs, String oldMetadata, String newMetadata, String transformFunction) {
+        this.transformType = transformType;
+        this.schemaName = schemaName;
+        this.tenantId = tenantId;
+        this.logicalTableName = logicalTableName;
+        this.newPhysicalTableName = newPhysicalTableName;
+        this.logicalParentName = logicalParentName;
+        this.transformStatus = transformStatus;
+        this.transformJobId = transformJobId;
+        this.transformRetryCount = transformRetryCount;
+        this.startTs = startTs;
+        this.endTs = endTs;
+        this.oldMetadata = oldMetadata;
+        this.newMetadata = newMetadata;
+        this.transformFunction = transformFunction;
+    }
+
+    public String getString() {
+        return String.format("transformType: %s, schameName: %s, logicalTableName: %s, newPhysicalTableName: %s, logicalParentName: %s "
+                , String.valueOf(transformType), String.valueOf(schemaName), String.valueOf(logicalTableName), String.valueOf(newPhysicalTableName),
+                String.valueOf(logicalParentName));
+    }
+
+    public PTable.TransformType getTransformType() {
+        return transformType;
+    }
+
+    public String getSchemaName() {
+        return schemaName;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+
+    public String getLogicalTableName() {
+        return logicalTableName;
+    }
+
+    public String getLogicalParentName() {
+        return logicalParentName;
+    }
+
+    public String getNewPhysicalTableName() {
+        return newPhysicalTableName;
+    }
+
+    public String getTransformStatus() {
+        return transformStatus;
+    }
+
+    public String getTransformJobId() {
+        return transformJobId;
+    }
+
+    public int getTransformRetryCount() {
+        return transformRetryCount;
+    }
+
+    public Timestamp getTransformStartTs() {
+        return startTs;
+    }
+
+    public Timestamp getTransformEndTs() {
+        return endTs;
+    }
+
+    public String getOldMetadata() {
+        return oldMetadata;
+    }
+    public String getNewMetadata() {
+        return newMetadata;
+    }
+    public String getTransformFunction() { return transformFunction; }
+
+    public boolean isActive() {
+        return (transformStatus.equals(PTable.TransformStatus.STARTED.name())
+                || transformStatus.equals(PTable.TransformStatus.CREATED.name()));
+    }
+
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+        value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
+        justification = "endTs and startTs are not used for mutation")
+    public static class SystemTransformBuilder {
+
+        private PTable.TransformType transformType;
+        private String schemaName;
+        private String tenantId;
+        private String logicalTableName;
+        private String logicalParentName;
+        private String newPhysicalTableName;
+        private String transformStatus = PTable.TransformStatus.CREATED.name();
+        private String transformJobId;
+        private int transformRetryCount =0;
+        private Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+        private Timestamp endTs;
+        private String oldMetadata;
+        private String newMetadata;
+        private String transformFunction;
+
+        public SystemTransformBuilder setTransformType(PTable.TransformType transformType) {
+            this.transformType = transformType;
+            return this;
+        }
+
+        public SystemTransformBuilder setSchemaName(String schemaName) {
+            this.schemaName = schemaName;
+            return this;
+        }
+
+        public SystemTransformBuilder setLogicalTableName(String tableName) {
+            this.logicalTableName = tableName;
+            return this;
+        }
+
+        public SystemTransformBuilder setTenantId(String tenant) {
+            this.tenantId = tenant;
+            return this;
+        }
+
+        public SystemTransformBuilder setLogicalParentName(String name) {
+            this.logicalParentName = name;
+            return this;
+        }
+
+        public SystemTransformBuilder setNewPhysicalTableName(String tableName) {
+            this.newPhysicalTableName = tableName;
+            return this;
+        }
+
+        public SystemTransformBuilder setTransformStatus(String transformStatus) {
+            this.transformStatus = transformStatus;
+            return this;
+        }
+
+        public SystemTransformBuilder setTransformJobId(String transformJobId) {
+            this.transformJobId = transformJobId;
+            return this;
+        }
+
+        public SystemTransformBuilder setOldMetadata(String oldMetadata) {
+            this.oldMetadata = oldMetadata;
+            return this;
+        }
+
+        public SystemTransformBuilder setNewMetadata(String newMetadata) {
+            this.newMetadata = newMetadata;
+            return this;
+        }
+
+        public SystemTransformBuilder setTransformRetryCount(int transformRetryCount) {
+            this.transformRetryCount = transformRetryCount;
+            return this;
+        }
+
+        public SystemTransformBuilder setStartTs(Timestamp startTs) {
+            this.startTs = startTs;
+            return this;
+        }
+
+        public SystemTransformBuilder setEndTs(Timestamp endTs) {
+            this.endTs = endTs;
+            return this;
+        }
+
+        public SystemTransformBuilder setTransformFunction(String transformFunction) {
+            this.transformFunction = transformFunction;
+            return this;
+        }
+
+        public SystemTransformRecord build() {
+            Timestamp end = endTs;
+            if (end == null && transformStatus != null && transformStatus.equals(PTable.TaskStatus.COMPLETED.toString())) {
+                end = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            }
+            return new SystemTransformRecord(transformType, schemaName,
+                    logicalTableName, tenantId, newPhysicalTableName, logicalParentName, transformStatus, transformJobId, transformRetryCount, startTs, end,
+                    oldMetadata, newMetadata, transformFunction);
+        }
+
+        public static SystemTransformRecord build(ResultSet resultSet) throws SQLException {
+            int col = 1;
+            SystemTransformBuilder builder = new SystemTransformBuilder();
+            builder.setTenantId(resultSet.getString(col++));
+            builder.setSchemaName(resultSet.getString(col++));
+            builder.setLogicalTableName(resultSet.getString(col++));
+            builder.setNewPhysicalTableName(resultSet.getString(col++));
+            builder.setTransformType(PTable.TransformType.fromSerializedValue(resultSet.getByte(col++)));
+            builder.setLogicalParentName(resultSet.getString(col++));
+            builder.setTransformStatus(resultSet.getString(col++));
+            builder.setTransformJobId(resultSet.getString(col++));
+            builder.setTransformRetryCount(resultSet.getInt(col++));
+            builder.setStartTs(resultSet.getTimestamp(col++));
+            builder.setEndTs(resultSet.getTimestamp(col++));
+            builder.setOldMetadata(resultSet.getString(col++));
+            builder.setNewMetadata(resultSet.getString(col++));
+            builder.setTransformFunction(resultSet.getString(col++));

Review comment:
       nit: `col` alone is fine

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import static org.apache.phoenix.schema.PTableType.INDEX;
+
+public class Transform {
+    public static final Logger LOGGER = LoggerFactory.getLogger(Transform.class);
+
+    private static String generateNewTableName(String schema, String logicalTableName, long seqNum) {
+        // TODO: Support schema versioning as well.
+        String newName = String.format("%s_%d", SchemaUtil.getTableName(schema, logicalTableName), seqNum);
+        return newName;
+    }
+
+    public static void addTransform(PhoenixConnection connection, String tenantId, PTable table,
+                                    MetaDataClient.MetaProperties changingProperties, long sequenceNum,
+                                    PTable.TransformType transformType) throws SQLException {
+        try {
+            String newMetadata = JacksonUtil.getObjectWriter().writeValueAsString(changingProperties);
+            String oldMetadata = "";
+            String newPhysicalTableName = "";
+            SystemTransformRecord.SystemTransformBuilder transformBuilder = new SystemTransformRecord.SystemTransformBuilder();
+            String schema = table.getSchemaName()!=null ? table.getSchemaName().getString() : null;
+            String logicalTableName = table.getTableName().getString();
+            transformBuilder.setSchemaName(schema);
+            transformBuilder.setLogicalTableName(logicalTableName);
+            transformBuilder.setTenantId(tenantId);
+            if (table.getType() == INDEX) {
+                transformBuilder.setLogicalParentName(table.getParentName().getString());
+            }
+            // TODO: add more ways of finding out what transform type this is
+            transformBuilder.setTransformType(transformType);
+            // TODO: calculate old and new metadata
+            transformBuilder.setNewMetadata(newMetadata);
+            transformBuilder.setOldMetadata(oldMetadata);
+            if (Strings.isNullOrEmpty(newPhysicalTableName)) {

Review comment:
       Is this condition always true?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+/**
+ * Task params to be used while upserting records in SYSTEM.TRANSFORM table.
+ * This POJO is mainly used while upserting(and committing) or generating
+ * upsert mutations plan in {@link Transform} class
+ */
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+    value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
+    justification = "endTs and startTs are not used for mutation")
+public class SystemTransformRecord {
+    private final PTable.TransformType transformType;
+    private final String schemaName;
+    private final String logicalTableName;
+    private final String tenantId;
+    private final String logicalParentName;
+    private final String newPhysicalTableName;
+    private final String transformStatus;
+    private final String transformJobId;
+    private final Integer transformRetryCount;
+    private final Timestamp startTs;
+    private final Timestamp endTs;
+    private final String oldMetadata;
+    private final String newMetadata;
+    private final String transformFunction;
+
+    public SystemTransformRecord(PTable.TransformType transformType,
+                                 String schemaName, String logicalTableName, String tenantId, String newPhysicalTableName, String logicalParentName,
+                                 String transformStatus, String transformJobId, Integer transformRetryCount, Timestamp startTs,
+                                 Timestamp endTs, String oldMetadata, String newMetadata, String transformFunction) {
+        this.transformType = transformType;
+        this.schemaName = schemaName;
+        this.tenantId = tenantId;
+        this.logicalTableName = logicalTableName;
+        this.newPhysicalTableName = newPhysicalTableName;
+        this.logicalParentName = logicalParentName;
+        this.transformStatus = transformStatus;
+        this.transformJobId = transformJobId;
+        this.transformRetryCount = transformRetryCount;
+        this.startTs = startTs;
+        this.endTs = endTs;
+        this.oldMetadata = oldMetadata;
+        this.newMetadata = newMetadata;
+        this.transformFunction = transformFunction;
+    }
+
+    public String getString() {
+        return String.format("transformType: %s, schameName: %s, logicalTableName: %s, newPhysicalTableName: %s, logicalParentName: %s "
+                , String.valueOf(transformType), String.valueOf(schemaName), String.valueOf(logicalTableName), String.valueOf(newPhysicalTableName),
+                String.valueOf(logicalParentName));
+    }
+
+    public PTable.TransformType getTransformType() {
+        return transformType;
+    }
+
+    public String getSchemaName() {
+        return schemaName;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+
+    public String getLogicalTableName() {
+        return logicalTableName;
+    }
+
+    public String getLogicalParentName() {
+        return logicalParentName;
+    }
+
+    public String getNewPhysicalTableName() {
+        return newPhysicalTableName;
+    }
+
+    public String getTransformStatus() {
+        return transformStatus;
+    }
+
+    public String getTransformJobId() {
+        return transformJobId;
+    }
+
+    public int getTransformRetryCount() {
+        return transformRetryCount;
+    }
+
+    public Timestamp getTransformStartTs() {
+        return startTs;
+    }
+
+    public Timestamp getTransformEndTs() {
+        return endTs;
+    }
+
+    public String getOldMetadata() {
+        return oldMetadata;
+    }
+    public String getNewMetadata() {
+        return newMetadata;
+    }
+    public String getTransformFunction() { return transformFunction; }
+
+    public boolean isActive() {
+        return (transformStatus.equals(PTable.TransformStatus.STARTED.name())
+                || transformStatus.equals(PTable.TransformStatus.CREATED.name()));
+    }
+
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+        value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
+        justification = "endTs and startTs are not used for mutation")
+    public static class SystemTransformBuilder {
+
+        private PTable.TransformType transformType;
+        private String schemaName;
+        private String tenantId;
+        private String logicalTableName;
+        private String logicalParentName;
+        private String newPhysicalTableName;
+        private String transformStatus = PTable.TransformStatus.CREATED.name();
+        private String transformJobId;
+        private int transformRetryCount =0;
+        private Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+        private Timestamp endTs;
+        private String oldMetadata;
+        private String newMetadata;
+        private String transformFunction;
+
+        public SystemTransformBuilder setTransformType(PTable.TransformType transformType) {
+            this.transformType = transformType;
+            return this;
+        }
+
+        public SystemTransformBuilder setSchemaName(String schemaName) {
+            this.schemaName = schemaName;
+            return this;
+        }
+
+        public SystemTransformBuilder setLogicalTableName(String tableName) {
+            this.logicalTableName = tableName;
+            return this;
+        }
+
+        public SystemTransformBuilder setTenantId(String tenant) {
+            this.tenantId = tenant;
+            return this;
+        }
+
+        public SystemTransformBuilder setLogicalParentName(String name) {
+            this.logicalParentName = name;
+            return this;
+        }
+
+        public SystemTransformBuilder setNewPhysicalTableName(String tableName) {
+            this.newPhysicalTableName = tableName;
+            return this;
+        }
+
+        public SystemTransformBuilder setTransformStatus(String transformStatus) {
+            this.transformStatus = transformStatus;
+            return this;
+        }
+
+        public SystemTransformBuilder setTransformJobId(String transformJobId) {
+            this.transformJobId = transformJobId;
+            return this;
+        }
+
+        public SystemTransformBuilder setOldMetadata(String oldMetadata) {
+            this.oldMetadata = oldMetadata;
+            return this;
+        }
+
+        public SystemTransformBuilder setNewMetadata(String newMetadata) {
+            this.newMetadata = newMetadata;
+            return this;
+        }
+
+        public SystemTransformBuilder setTransformRetryCount(int transformRetryCount) {
+            this.transformRetryCount = transformRetryCount;
+            return this;
+        }
+
+        public SystemTransformBuilder setStartTs(Timestamp startTs) {
+            this.startTs = startTs;
+            return this;
+        }
+
+        public SystemTransformBuilder setEndTs(Timestamp endTs) {
+            this.endTs = endTs;
+            return this;
+        }
+
+        public SystemTransformBuilder setTransformFunction(String transformFunction) {
+            this.transformFunction = transformFunction;
+            return this;
+        }
+
+        public SystemTransformRecord build() {
+            Timestamp end = endTs;
+            if (end == null && transformStatus != null && transformStatus.equals(PTable.TaskStatus.COMPLETED.toString())) {
+                end = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());

Review comment:
       Shall we ignore `end == null` case and always set `end` to current time?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import static org.apache.phoenix.schema.PTableType.INDEX;
+
+public class Transform {
+    public static final Logger LOGGER = LoggerFactory.getLogger(Transform.class);
+
+    private static String generateNewTableName(String schema, String logicalTableName, long seqNum) {
+        // TODO: Support schema versioning as well.
+        String newName = String.format("%s_%d", SchemaUtil.getTableName(schema, logicalTableName), seqNum);
+        return newName;
+    }
+
+    public static void addTransform(PhoenixConnection connection, String tenantId, PTable table,
+                                    MetaDataClient.MetaProperties changingProperties, long sequenceNum,
+                                    PTable.TransformType transformType) throws SQLException {
+        try {
+            String newMetadata = JacksonUtil.getObjectWriter().writeValueAsString(changingProperties);
+            String oldMetadata = "";
+            String newPhysicalTableName = "";
+            SystemTransformRecord.SystemTransformBuilder transformBuilder = new SystemTransformRecord.SystemTransformBuilder();
+            String schema = table.getSchemaName()!=null ? table.getSchemaName().getString() : null;
+            String logicalTableName = table.getTableName().getString();
+            transformBuilder.setSchemaName(schema);
+            transformBuilder.setLogicalTableName(logicalTableName);
+            transformBuilder.setTenantId(tenantId);
+            if (table.getType() == INDEX) {
+                transformBuilder.setLogicalParentName(table.getParentName().getString());
+            }
+            // TODO: add more ways of finding out what transform type this is
+            transformBuilder.setTransformType(transformType);
+            // TODO: calculate old and new metadata
+            transformBuilder.setNewMetadata(newMetadata);
+            transformBuilder.setOldMetadata(oldMetadata);
+            if (Strings.isNullOrEmpty(newPhysicalTableName)) {
+                newPhysicalTableName = generateNewTableName(schema, logicalTableName, sequenceNum);
+            }
+            transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
+            Transform.addTransform(transformBuilder.build(), connection);
+        } catch (JsonProcessingException ex) {
+            LOGGER.error("addTransform failed", ex);
+            throw new SQLException("Adding transform failed with JsonProcessingException");
+        }
+    }
+
+    public static void addTransform(
+            SystemTransformRecord systemTransformParams, PhoenixConnection connection) throws SQLException {
+        try (PreparedStatement stmt = connection.prepareStatement("UPSERT INTO " +
+                PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " ( " +
+                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
+                PhoenixDatabaseMetaData.TENANT_ID + "," +
+                PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
+                PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
+                PhoenixDatabaseMetaData.OLD_METADATA + " , " +
+                PhoenixDatabaseMetaData.NEW_METADATA + " , " +
+                PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
+                " ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")) {
+            int colNum = 1;
+            if (systemTransformParams.getSchemaName() != null) {
+                stmt.setString(colNum++, systemTransformParams.getSchemaName());
+            } else {
+                stmt.setNull(colNum++, Types.VARCHAR);
+            }
+            stmt.setString(colNum++, systemTransformParams.getLogicalTableName());
+            if (systemTransformParams.getTenantId() != null) {
+                stmt.setString(colNum++, systemTransformParams.getTenantId());
+            } else {
+                stmt.setNull(colNum++, Types.VARCHAR);
+            }
+            stmt.setString(colNum++, systemTransformParams.getNewPhysicalTableName());
+
+            stmt.setInt(colNum++, systemTransformParams.getTransformType().getSerializedValue());
+            if (systemTransformParams.getLogicalParentName() != null) {
+                stmt.setString(colNum++, systemTransformParams.getLogicalParentName());
+            } else {
+                stmt.setNull(colNum++, Types.VARCHAR);
+            }
+
+            stmt.setString(colNum++, systemTransformParams.getTransformStatus());
+
+            if (systemTransformParams.getTransformJobId() != null) {
+                stmt.setString(colNum++, systemTransformParams.getTransformJobId());
+            } else {
+                stmt.setNull(colNum++, Types.VARCHAR);
+            }
+            stmt.setInt(colNum++, systemTransformParams.getTransformRetryCount());
+
+            stmt.setTimestamp(colNum++, systemTransformParams.getTransformStartTs());
+
+            if (systemTransformParams.getTransformEndTs() != null) {
+                stmt.setTimestamp(colNum++, systemTransformParams.getTransformEndTs());
+            } else {
+                stmt.setNull(colNum++, Types.TIMESTAMP);
+            }
+            if (systemTransformParams.getOldMetadata() != null) {
+                stmt.setString(colNum++, systemTransformParams.getOldMetadata());
+            } else {
+                stmt.setNull(colNum++, Types.VARCHAR);
+            }
+            if (systemTransformParams.getNewMetadata() != null) {
+                stmt.setString(colNum++, systemTransformParams.getNewMetadata());
+            } else {
+                stmt.setNull(colNum++, Types.VARCHAR);
+            }
+            if (systemTransformParams.getTransformFunction() != null) {
+                stmt.setString(colNum++, systemTransformParams.getTransformFunction());
+            } else {
+                stmt.setNull(colNum++, Types.VARCHAR);
+            }
+
+            LOGGER.info("Adding transform type: "
+                    + systemTransformParams.getString());
+            stmt.execute();
+        }
+    }
+
+
+    public static SystemTransformRecord getTransformRecord(
+            String schema, String logicalTableName, String logicalParentName, String tenantId, PhoenixConnection connection) throws SQLException {
+        try (ResultSet resultSet = connection.prepareStatement("SELECT " +
+                PhoenixDatabaseMetaData.TENANT_ID + ", " +
+                PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
+                PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
+                PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
+                PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
+                PhoenixDatabaseMetaData.OLD_METADATA + " , " +
+                PhoenixDatabaseMetaData.NEW_METADATA + " , " +
+                PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
+                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE  " +
+                (Strings.isNullOrEmpty(tenantId) ? "" : (PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
+                (Strings.isNullOrEmpty(schema) ? "" : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
+                PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + logicalTableName + "'" +
+                (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " + PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
+        ).executeQuery()) {
+            if (resultSet.next()) {
+                return SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+            }
+            return null;
+        }
+    }
+
+    public static boolean checkIsTransformNeeded(MetaDataClient.MetaProperties metaProperties, String schemaName,
+                                                 PTable table, String logicalTableName, String parentTableName,
+                                                 String tenantId, PhoenixConnection connection) throws SQLException {
+        boolean isTransformNeeded = isTransformNeeded(metaProperties, table);
+        if (isTransformNeeded) {
+            SystemTransformRecord existingTransform = Transform.getTransformRecord(schemaName, logicalTableName, parentTableName, tenantId,connection);
+            if (existingTransform != null && existingTransform.isActive()) {
+                throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE)
+                        .setMessage(" Only one transform at a time is allowed ")
+                        .setSchemaName(schemaName).setTableName(logicalTableName).build().buildException();
+            }
+        }
+        return isTransformNeeded;
+    }
+
+    private static boolean isTransformNeeded(MetaDataClient.MetaProperties metaProperties, PTable table){
+        if (metaProperties.getImmutableStorageSchemeProp()!=null
+                && metaProperties.getImmutableStorageSchemeProp() != table.getImmutableStorageScheme()) {
+            // Transform is needed
+            return true;
+        }
+        if (metaProperties.getColumnEncodedBytesProp()!=null
+                && metaProperties.getColumnEncodedBytesProp() != table.getEncodingScheme()) {
+            return true;
+        }
+        return false;

Review comment:
       Could be simplified to: `return metaProperties.getColumnEncodedBytesProp() != null && metaProperties.getColumnEncodedBytesProp() != table.getEncodingScheme()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org