You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ji...@apache.org on 2023/12/01 21:53:07 UTC

(phoenix) branch PHOENIX-6978-feature updated: PHOENIX-7041 Populate ROW_KEY_PREFIX column when creating views (#1733)

This is an automated email from the ASF dual-hosted git repository.

jisaac pushed a commit to branch PHOENIX-6978-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6978-feature by this push:
     new 6ef3230214 PHOENIX-7041 Populate ROW_KEY_PREFIX column when creating views (#1733)
6ef3230214 is described below

commit 6ef32302141375f3b8fa64cdbe632c1e4375feab
Author: Jacob Isaac <ja...@gmail.com>
AuthorDate: Fri Dec 1 13:53:01 2023 -0800

    PHOENIX-7041 Populate ROW_KEY_PREFIX column when creating views (#1733)
    
    PHOENIX-7041 Populate ROW_KEY_PREFIX column when creating views
    ---------
    
    Co-authored-by: Jacob Isaac <ji...@salesforce.com>
---
 .../end2end/prefix/BaseRowKeyPrefixTestIT.java     | 878 +++++++++++++++++++++
 .../LongViewIndexDisabledBaseRowKeyPrefixIT.java   |  53 ++
 .../LongViewIndexEnabledBaseRowKeyPrefixIT.java    |  53 ++
 .../phoenix/compile/CreateTableCompiler.java       |  30 +-
 .../org/apache/phoenix/compile/ScanRanges.java     |   1 -
 .../org/apache/phoenix/compile/WhereOptimizer.java | 171 +++-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |   3 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |  77 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |   5 +-
 .../apache/phoenix/compile/QueryOptimizerTest.java |   2 +
 .../apache/phoenix/compile/ViewCompilerTest.java   |   3 +
 .../phoenix/jdbc/PhoenixResultSetMetadataTest.java |   2 +-
 12 files changed, 1225 insertions(+), 53 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/prefix/BaseRowKeyPrefixTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/prefix/BaseRowKeyPrefixTestIT.java
new file mode 100644
index 0000000000..d66403cae6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/prefix/BaseRowKeyPrefixTestIT.java
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end.prefix;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SubstringComparator;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.CreateTableCompiler;
+import org.apache.phoenix.compile.ExpressionCompiler;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.WhereOptimizer;
+import org.apache.phoenix.coprocessor.TableInfo;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.apache.phoenix.exception.SQLExceptionCode.VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_TABLE_NAME;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public abstract class BaseRowKeyPrefixTestIT extends ParallelStatsDisabledIT {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BaseRowKeyPrefixTestIT.class);
+
+    public static final String TENANT_URL_FMT = "%s;%s=%s";
+    public static final String ORG_ID_PREFIX = "00D0x0000";
+    public static final String ORG_ID_FMT = "%s%06d";
+    public static final String PARTITION_FMT = "%03d";
+    public static final String BASE_TABLE_NAME_FMT = "TEST_ENTITY.%s";
+    public static final String INDEX_TABLE_NAME_FMT = "_IDX_TEST_ENTITY.%s";
+
+    public static final String GLOBAL_VIEW_NAME_FMT = "TEST_ENTITY.G1_%s";
+    public static final String TENANT_VIEW_NAME_FMT = "TEST_ENTITY.TV_%s_%d";
+    public static final String LEAF_VIEW_NAME_FMT = "TEST_ENTITY.Z%02d_%02d_%s";
+    public static final String ROW_ID_FMT = "00R0x000%07d";
+    public static final String ZID_FMT = "00Z0x000%07d";
+    public static final String COL1_FMT = "a%05d";
+    public static final String COL2_FMT = "b%05d";
+    public static final String COL3_FMT = "b%05d";
+    public static final Random RANDOM_GEN = new Random();
+    public static final int MAX_ROWS = 10000;
+
+
+    // Returns the String form of the data type
+    private String getType(PDataType pkType) {
+        String pkTypeStr = "VARCHAR(25)";
+        switch (pkType.getSqlType()) {
+            case Types.VARCHAR:
+                pkTypeStr = "VARCHAR(25)";
+                break;
+            case Types.CHAR:
+                pkTypeStr = "CHAR(15)";
+                break;
+            case Types.DECIMAL:
+                pkTypeStr = "DECIMAL(8,2)";
+                break;
+            case Types.INTEGER:
+                pkTypeStr = "INTEGER";
+                break;
+            case Types.BIGINT:
+                pkTypeStr = "BIGINT";
+                break;
+            case Types.DATE:
+                pkTypeStr = "DATE";
+                break;
+            case Types.TIMESTAMP:
+                pkTypeStr = "TIMESTAMP";
+                break;
+            default:
+                pkTypeStr = "VARCHAR(25)";
+        }
+        return pkTypeStr;
+    }
+
+    // Returns random data for given data type
+    private Object getData(PDataType type) {
+        Random rnd = new Random();
+        switch (type.getSqlType()) {
+        case Types.VARCHAR:
+            return RandomStringUtils.randomAlphanumeric(25);
+        case Types.CHAR:
+            //pkTypeStr = "CHAR(15)";
+            return RandomStringUtils.randomAlphanumeric(15);
+        case Types.DECIMAL:
+            //pkTypeStr = "DECIMAL(8,2)";
+            return Math.floor(rnd.nextInt(50000) * rnd.nextDouble());
+        case Types.INTEGER:
+            //pkTypeStr = "INTEGER";
+            return rnd.nextInt(50000);
+        case Types.BIGINT:
+            //pkTypeStr = "BIGINT";
+            return rnd.nextLong();
+        case Types.DATE:
+            //pkTypeStr = "DATE";
+            return new Date(System.currentTimeMillis() + rnd.nextInt(50000));
+        case Types.TIMESTAMP:
+            //pkTypeStr = "TIMESTAMP";
+            return new Timestamp(System.currentTimeMillis() + rnd.nextInt(50000));
+        default:
+            // pkTypeStr = "VARCHAR(25)";
+            return RandomStringUtils.randomAlphanumeric(25);
+        }
+    }
+
+    // Returns random data for given data type
+    private Object getPKData(String testPKTypes) {
+        Random rnd = new Random();
+        switch (testPKTypes) {
+            case "VARCHAR": {
+                return RandomStringUtils.randomAlphanumeric(25);
+            }
+            case "CHAR(15)": {
+                //pkTypeStr = "CHAR(15)";
+                return RandomStringUtils.randomAlphanumeric(15);
+            }
+            case "CHAR(3)": {
+                //pkTypeStr = "CHAR(3)";
+                return RandomStringUtils.randomAlphanumeric(3);
+            }
+            case "DECIMAL":
+                //pkTypeStr = "DECIMAL(8,2)";
+                return Math.floor(rnd.nextInt(50000) * rnd.nextDouble());
+            case "INTEGER":
+                //pkTypeStr = "INTEGER";
+                return rnd.nextInt(50000);
+            case "BIGINT":
+                //pkTypeStr = "BIGINT";
+                return rnd.nextInt(50000);
+            case "DATE":
+                //pkTypeStr = "DATE";
+                return new Date(System.currentTimeMillis() + rnd.nextInt(50000));
+            case "TIMESTAMP":
+                //pkTypeStr = "TIMESTAMP";
+                return new Timestamp(System.currentTimeMillis() + rnd.nextInt(50000));
+            default:
+                // pkTypeStr = "VARCHAR(25)";
+                return RandomStringUtils.randomAlphanumeric(25);
+
+        }
+    }
+
+    // Helper method to create a base table
+    private void createBaseTable(String tableName, boolean isMultiTenant) throws SQLException {
+        String baseTableName = String.format(BASE_TABLE_NAME_FMT, tableName);
+
+        try (Connection globalConnection = DriverManager.getConnection(getUrl())) {
+            try (Statement cstmt = globalConnection.createStatement()) {
+                String CO_BASE_TBL_TEMPLATE = "CREATE TABLE IF NOT EXISTS %s(OID CHAR(15) NOT NULL,KP CHAR(3) NOT NULL, COL1 VARCHAR,CREATED_DATE DATE,CREATED_BY CHAR(15),LAST_UPDATE DATE,LAST_UPDATE_BY CHAR(15),SYSTEM_MODSTAMP DATE CONSTRAINT pk PRIMARY KEY (OID,KP)) COLUMN_ENCODED_BYTES=0 %s";
+                cstmt.execute(String.format(CO_BASE_TBL_TEMPLATE, baseTableName, isMultiTenant ? ", MULTI_TENANT=true" : ""));
+            }
+        }
+    }
+
+    // Helper method to create a global view
+    // Return a pair [view-key, row-prefix for the view]
+    private Pair<String, byte[]> createGlobalView(String tableName,
+            int partition,
+            PDataType[] pkTypes,
+            SortOrder[] pkOrders,
+            boolean hasGlobalViewIndexes) throws SQLException {
+
+        String pkType1Str = getType(pkTypes[0]);
+        String pkType2Str = getType(pkTypes[1]);
+        String pkType3Str = getType(pkTypes[2]);
+
+        String baseTableName = String.format(BASE_TABLE_NAME_FMT, tableName);
+        String partitionName = String.format(PARTITION_FMT, partition);
+        String globalViewName = String.format(GLOBAL_VIEW_NAME_FMT, partitionName);
+        try (PhoenixConnection globalConnection = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class)) {
+            try (Statement cstmt = globalConnection.createStatement()) {
+                String VIEW_TEMPLATE = "CREATE VIEW IF NOT EXISTS %s(ID1 %s not null,ID2 %s not null,ID3 %s not null, ROW_ID CHAR(15) not null, COL2 VARCHAR " +
+                        "CONSTRAINT pk PRIMARY KEY (ID1 %s, ID2 %s, ID3 %s, ROW_ID)) " +
+                        "AS SELECT * FROM %s WHERE KP = '%s'";
+
+                cstmt.execute(String.format(VIEW_TEMPLATE, globalViewName, pkType1Str, pkType2Str, pkType3Str,
+                        pkOrders[0].name(),pkOrders[1].name(),pkOrders[2].name(), baseTableName, partitionName));
+                if (hasGlobalViewIndexes) {
+                    String indexNamePrefix = String.format("G%s", partition);
+                    String GLOBAL_INDEX_TEMPLATE = "CREATE INDEX IF NOT EXISTS %s_COL2_INDEX ON %s (COL2) INCLUDE(SYSTEM_MODSTAMP)";
+                    cstmt.execute(String.format(GLOBAL_INDEX_TEMPLATE, indexNamePrefix, globalViewName));
+                }
+
+                return getRowKeyPrefixesFromView(globalConnection.unwrap(PhoenixConnection.class), globalViewName);
+            }
+        }
+    }
+
+    // Helper method to create a tenant view
+    // Return a pair [view-key, row-prefix for the view]
+
+    private Pair<String, byte[]> createTenantView(
+            boolean extendPK,
+            int partition,
+            int tenant,
+            int tenantViewNum,
+            String[] pkNames,
+            PDataType[] pkTypes) throws SQLException {
+
+        String partitionName = String.format(PARTITION_FMT, partition);
+        String globalViewName = String.format(GLOBAL_VIEW_NAME_FMT, partitionName);
+
+        String tenantId = String.format(ORG_ID_FMT, ORG_ID_PREFIX, tenant);
+        String tenantConnectionUrl = String.format(TENANT_URL_FMT, getUrl(), TENANT_ID_ATTRIB, tenantId);
+        String tenantViewName = String.format(TENANT_VIEW_NAME_FMT, partitionName, tenantViewNum);
+        //String tenantViewOptions = (partition % 2 != 0) ? String.format("TTL=%d", new Random().nextInt(300)) : "";
+        String tenantViewOptions = "";
+        try (Connection tenantConnection = DriverManager.getConnection(tenantConnectionUrl)) {
+            tenantConnection.setAutoCommit(true);
+            try (Statement cstmt = tenantConnection.createStatement()) {
+                String VIEW_WITH_PK_TEMPLATE = "CREATE VIEW IF NOT EXISTS %s(ZID CHAR(15) NOT NULL,COL3 VARCHAR CONSTRAINT pk PRIMARY KEY (ZID)) " +
+                        "AS SELECT * FROM %s %s %s";
+                String VIEW_WO_PK_TEMPLATE = "CREATE VIEW IF NOT EXISTS %s AS SELECT * from %s %s";
+                if (extendPK) {
+                    cstmt.execute(String.format(VIEW_WITH_PK_TEMPLATE, tenantViewName, globalViewName, getWhereClause(pkNames, pkTypes), tenantViewOptions));
+                } else {
+                    cstmt.execute(String.format(VIEW_WO_PK_TEMPLATE, tenantViewName, globalViewName, tenantViewOptions));
+                }
+                return getRowKeyPrefixesFromView(tenantConnection.unwrap(PhoenixConnection.class), tenantViewName);
+            }
+        }
+    }
+
+    // Helper method to create rows for a given tenant view
+    private void upsertTenantViewRows(
+            boolean isMultiTenant,
+            boolean extendPK,
+            int partition,
+            int tenant,
+            int tenantViewNum,
+            int rowIndex,
+            String[] pkNames,
+            PDataType[] pkTypes
+            ) throws SQLException {
+
+        String rid = String.format(ROW_ID_FMT, rowIndex);
+        String zid = String.format(ZID_FMT, rowIndex);
+        String col1 = String.format(COL1_FMT, rowIndex, RANDOM_GEN.nextInt(MAX_ROWS));
+        String col2 = String.format(COL2_FMT, rowIndex, RANDOM_GEN.nextInt(MAX_ROWS));
+        String col3 = String.format(COL3_FMT, rowIndex, RANDOM_GEN.nextInt(MAX_ROWS));
+        Object pk1 = null;
+        Object pk2 = null;
+        Object pk3 = null;
+
+        String partitionName = String.format(PARTITION_FMT, partition);
+
+        String tenantId = String.format(ORG_ID_FMT, ORG_ID_PREFIX, tenant);
+        String tenantConnectionUrl = String.format(TENANT_URL_FMT, getUrl(), TENANT_ID_ATTRIB, tenantId);
+        String tenantViewName = String.format(TENANT_VIEW_NAME_FMT, partitionName, tenantViewNum);
+        try (PhoenixConnection tenantConnection = DriverManager.getConnection(tenantConnectionUrl).unwrap(PhoenixConnection.class)) {
+            tenantConnection.setAutoCommit(true);
+            String TENANT_VIEW_WITH_PK = String.format("UPSERT INTO %s(ROW_ID, ZID, COL1, COL2, COL3, SYSTEM_MODSTAMP) VALUES(?, ?, ?, ?, ?, ?)", tenantViewName);
+            String TENANT_VIEW_WO_PK = String.format("UPSERT INTO %s(ID1, ID2, ID3, ROW_ID, COL1, COL2, SYSTEM_MODSTAMP) VALUES(?, ?, ?, ?, ?, ?, ?)", tenantViewName);
+            String NON_MULTI_TENANT_VIEW_WO_PK = String.format("UPSERT INTO %s(OID, ID1, ID2, ID3, ROW_ID, COL1, COL2, SYSTEM_MODSTAMP) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", tenantViewName);
+            String NON_MULTI_TENANT_VIEW_WITH_PK = String.format("UPSERT INTO %s(OID, ROW_ID, ZID, COL1, COL2, COL3, SYSTEM_MODSTAMP) VALUES(?, ?, ?, ?, ?, ?, ?)", tenantViewName);
+
+            if (isMultiTenant) {
+                if (extendPK) {
+                    // Case: MultiTenant and ExtendedPK
+                    try (PreparedStatement pstmt = tenantConnection.prepareStatement(TENANT_VIEW_WITH_PK)) {
+                        pstmt.setObject(1, rid);
+                        pstmt.setObject(2, zid);
+                        pstmt.setObject(3, col1);
+                        pstmt.setObject(4, col2);
+                        pstmt.setObject(5, col3);
+                        pstmt.setObject(6, new Date(System.currentTimeMillis()));
+                        pstmt.execute();
+                    }
+                } else {
+                    // Case: MultiTenant and Non ExtendedPK
+                    pk1 = getData(pkTypes[0]);
+                    pk2 = getData(pkTypes[1]);
+                    pk3 = getData(pkTypes[2]);
+                    try (PreparedStatement pstmt = tenantConnection.prepareStatement(TENANT_VIEW_WO_PK)) {
+                        pstmt.setObject(1, pk1);
+                        pstmt.setObject(2, pk2);
+                        pstmt.setObject(3, pk3);
+                        pstmt.setObject(4, rid);
+                        pstmt.setObject(5, col1);
+                        pstmt.setObject(6, col2);
+                        pstmt.setObject(7, new Date(System.currentTimeMillis()));
+                        pstmt.execute();
+                    }
+                }
+
+            } else {
+                if (extendPK) {
+                    // Case: Non MultiTenant and ExtendedPK
+                    try (PreparedStatement pstmt = tenantConnection.prepareStatement(NON_MULTI_TENANT_VIEW_WITH_PK)) {
+                        pstmt.setObject(1, tenantId);
+                        pstmt.setObject(2, rid);
+                        pstmt.setObject(3, zid);
+                        pstmt.setObject(4, col1);
+                        pstmt.setObject(5, col2);
+                        pstmt.setObject(6, col3);
+                        pstmt.setObject(7, new Date(System.currentTimeMillis()));
+                        pstmt.execute();
+                    }
+                } else {
+                    // Case: Non MultiTenant and Non ExtendedPK
+                    pk1 = getData(pkTypes[0]);
+                    pk2 = getData(pkTypes[1]);
+                    pk3 = getData(pkTypes[2]);
+                    try (PreparedStatement pstmt = tenantConnection.prepareStatement(NON_MULTI_TENANT_VIEW_WO_PK)) {
+                        pstmt.setObject(1, tenantId);
+                        pstmt.setObject(2, pk1);
+                        pstmt.setObject(3, pk2);
+                        pstmt.setObject(4, pk3);
+                        pstmt.setObject(5, rid);
+                        pstmt.setObject(6, col1);
+                        pstmt.setObject(7, col2);
+                        pstmt.setObject(8, new Date(System.currentTimeMillis()));
+                        pstmt.execute();
+                    }
+                }
+
+            }
+
+        }
+        finally {
+            LOGGER.debug(String.format("Upsert values " +
+                    "tenantId = %s, pk1 = %s, pk2 = %s, pk3 = %s, rid = %s, col1 = %s, col2 = %s",
+                    tenantId,
+                    pk1,
+                    pk2,
+                    pk3,
+                    rid,
+                    col1,
+                    col2));
+        }
+    }
+
+    // Helper to get rowKeyPrefix from Metadata.
+    private Pair<String, byte[]> getRowKeyPrefixesFromView(PhoenixConnection connection, String viewName)
+            throws SQLException {
+
+        PName tenantId = connection.getTenantId();
+        PTable view = PhoenixRuntime.getTable(connection, viewName);
+        String tenantViewKey = String.format("%s.%s", tenantId, viewName);
+        byte[] rowkeyPrefix = view.getRowKeyPrefix();
+        return new Pair(tenantViewKey, rowkeyPrefix);
+
+    }
+    // Helper to get rowKeyPrefix from Metadata.
+    private Pair<String, byte[]> getRowKeyPrefixesFromView(PhoenixConnection connection, PTable view)
+            throws SQLException {
+        return getRowKeyPrefixesFromView(connection, view.getName().getString());
+    }
+
+
+    // Helper to assert that row prefix generated by the 2 WhereOptimizer methods are the same
+    private byte[] assertRowKeyPrefixForView(PhoenixConnection connection, PTable view, Pair<String, byte[]> rowKeyInfo)
+            throws SQLException {
+
+        String viewStatement = view.getViewStatement();
+        SelectStatement viewSelectStatement = new SQLParser(viewStatement).parseQuery();
+
+        PhoenixPreparedStatement preparedViewStatement =
+                connection.prepareStatement(viewStatement).unwrap(PhoenixPreparedStatement.class);
+
+        ColumnResolver resolver = FromCompiler.getResolverForQuery(viewSelectStatement, connection);
+        StatementContext viewStatementContext = new StatementContext(preparedViewStatement, resolver);
+
+        PTable viewStatementTable = viewStatementContext.getCurrentTable().getTable();
+
+        int nColumns = viewStatementTable.getColumns().size();
+        BitSet isViewColumnReferencedToBe = new BitSet(nColumns);
+        // Used to track column references in a view
+        ExpressionCompiler
+                expressionCompiler = new CreateTableCompiler.ColumnTrackingExpressionCompiler(viewStatementContext, isViewColumnReferencedToBe);
+        ParseNode whereNode = viewSelectStatement.getWhere();
+
+        Expression whereExpression = whereNode.accept(expressionCompiler);
+
+
+        byte[][] viewColumnConstantsToBe = new byte[nColumns][];
+        CreateTableCompiler.ViewWhereExpressionVisitor
+                visitor = new CreateTableCompiler.ViewWhereExpressionVisitor(viewStatementTable, viewColumnConstantsToBe);
+        whereExpression.accept(visitor);
+
+        TableName tableName = TableName.createNormalized(view.getSchemaName().getString(), view.getTableName().getString());
+        byte[] rowKeyPrefix1 = WhereOptimizer.getRowKeyPrefix(viewStatementContext, tableName, viewStatementTable, whereExpression);
+        byte[] rowKeyPrefix2 = WhereOptimizer.getRowKeyPrefix(connection, tableName, viewStatementTable, viewColumnConstantsToBe, isViewColumnReferencedToBe);
+        LOGGER.debug(String.format("target-view-name = %s, physical = %s, stmt-table = %s\n, row-prefix-0 = %s (syscat)\n, row-prefix-1 = %s\n, row-prefix-2 = %s\n",
+                view.getName().getString(),
+                viewStatementTable.getPhysicalName().getString(),
+                viewStatementTable.getName().getString(),
+                Bytes.toStringBinary(rowKeyInfo.getSecond()),
+                Bytes.toStringBinary(rowKeyPrefix1),
+                Bytes.toStringBinary(rowKeyPrefix2)
+        ));
+        assertTrue("RowKey Prefixes do not match", Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyPrefix1) == 0);
+        assertTrue("RowKey Prefixes do not match", Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyPrefix2) == 0);
+        return rowKeyPrefix1;
+    }
+
+    // Helper method to return the row prefixes for the whole view hierarchy under the table.
+    private Map<String, byte[]> assertRowKeyPrefixesForTable(String url, String parentSchemaName, String parentTableName) {
+        Map<String, byte[]> viewToRowKeyMap = Maps.newHashMap();
+        Properties tenantProps = PropertiesUtil.deepCopy(new Properties());
+        try (Connection globalConnection = DriverManager.getConnection(url)) {
+            ConnectionQueryServices cqs = globalConnection.unwrap(PhoenixConnection.class).getQueryServices();
+            try (Table childLinkTable = cqs.getTable(SchemaUtil.getPhysicalName(
+                    SYSTEM_LINK_HBASE_TABLE_NAME.toBytes(), cqs.getProps()).getName())) {
+                Pair<List<PTable>, List<TableInfo>> allDescendants =
+                        ViewUtil.findAllDescendantViews(childLinkTable, cqs.getConfiguration(),
+                                EMPTY_BYTE_ARRAY, parentSchemaName.getBytes(),
+                                parentTableName.getBytes(), HConstants.LATEST_TIMESTAMP, false);
+                for (PTable view : allDescendants.getFirst()) {
+                    PName tenantId = view.getTenantId();
+                    String viewName = view.getName().getString();
+
+                    Connection stmtConnection = tenantId == null?
+                            globalConnection :  DriverManager.getConnection(
+                            String.format("%s;%s=%s", url, TENANT_ID_ATTRIB, tenantId.getString()), tenantProps);
+
+                    Pair<String, byte[]> rowKeyInfo  = getRowKeyPrefixesFromView(stmtConnection.unwrap(PhoenixConnection.class), view);
+                    assertRowKeyPrefixForView(stmtConnection.unwrap(PhoenixConnection.class), view, rowKeyInfo);
+                    viewToRowKeyMap.put(rowKeyInfo.getFirst(), rowKeyInfo.getSecond());
+                }
+            };
+        } catch (SQLException | IOException e) {
+            throw new RuntimeException(e);
+        }
+        return viewToRowKeyMap;
+
+    }
+
+    private String getWhereClause(String[] pkNames, PDataType[] testPKTypes) {
+
+        StringBuilder builder = new StringBuilder("WHERE ");
+        Random rnd = new Random();
+
+        for (int b=0;b<testPKTypes.length;b++) {
+            if (b>0) builder.append(" AND ");
+            switch (testPKTypes[b].getSqlType()) {
+                case Types.VARCHAR: {
+                    // pkTypeStr = "VARCHAR(25)";
+                    builder.append(pkNames[b]).append(" = ").append("'")
+                            .append(RandomStringUtils.randomAlphanumeric(25)).append("'");
+                    break;
+                }
+                case Types.CHAR: {
+                    //pkTypeStr = "CHAR(15)";
+                    builder.append(pkNames[b]).append(" = ").append("'")
+                            .append(RandomStringUtils.randomAlphanumeric(15)).append("'");
+                    break;
+                }
+                case Types.DECIMAL:
+                    //pkTypeStr = "DECIMAL(8,2)";
+                    builder.append(pkNames[b]).append(" = ").append(rnd.nextDouble());
+                    break;
+                case Types.INTEGER:
+                    //pkTypeStr = "INTEGER";
+                    builder.append(pkNames[b]).append(" = ").append(rnd.nextInt(500000));
+                    break;
+                case Types.BIGINT:
+                    //pkTypeStr = "BIGINT";
+                    builder.append(pkNames[b]).append(" = ").append(rnd.nextLong());
+                    break;
+                case Types.DATE:
+                    //pkTypeStr = "DATE";
+                    builder.append(pkNames[b]).append(" = ").append(" TO_DATE('2022-03-21T15:03:57+00:00') ");
+                    break;
+                case Types.TIMESTAMP:
+                    //pkTypeStr = "TIMESTAMP";
+                    builder.append(pkNames[b]).append(" = ").append(" TO_TIMESTAMP('2019-10-27T16:17:57+00:00') ");
+                    break;
+                default:
+                    // pkTypeStr = "VARCHAR(25)";
+                    builder.append(pkNames[b]).append("=").append("'")
+                            .append(RandomStringUtils.randomAlphanumeric(15)).append("'");
+            }
+        }
+        return builder.toString();
+    }
+
+    // Asserts that the row matched by the rowId and tenantId matches the prefix
+    private void assertHBaseRowKeyMatchesPrefix(PhoenixConnection connection, byte[] hbaseTableName, int rowId, byte[] prefix)
+            throws IOException, SQLException {
+
+        byte[] rowkey = ByteUtil.EMPTY_BYTE_ARRAY;
+        String rid = String.format(ROW_ID_FMT, rowId);
+
+        try (Table tbl = connection.getQueryServices().getTable(hbaseTableName)) {
+
+            PName tenantId = connection.getTenantId();
+            Scan allRows = new Scan();
+            // Add tenant as the prefix filter
+            FilterList andFilter = new FilterList();
+            if (tenantId != null) {
+                andFilter.addFilter(new PrefixFilter(tenantId.getBytes()));
+            }
+            andFilter.addFilter(new RowFilter(CompareOperator.EQUAL, new SubstringComparator(rid)));
+            allRows.setFilter(andFilter);
+            ResultScanner scanner = tbl.getScanner(allRows);
+            int numMatchingRows = 0;
+            for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                rowkey = result.getRow();
+                numMatchingRows++;
+            }
+            assertEquals(String.format("Expected rows do match for table = %s, rowId = %s",
+                    Bytes.toString(hbaseTableName), rowId), 1, numMatchingRows);
+
+            PrefixFilter matchFilter = new PrefixFilter(prefix);
+            LOGGER.debug(String.format("row-key = %s, tenantId = %s, prefix = %s, matched = %s",
+                    Bytes.toStringBinary(rowkey),
+                    tenantId,
+                    Bytes.toStringBinary(prefix),
+                    !matchFilter.filterRowKey(KeyValueUtil.createFirstOnRow(rowkey))));
+        }
+    }
+
+
+    // Asserts that the row matching the tenantId and rowId matches with the viewIndexId
+    private void assertIndexTableRowKeyMatchesPrefix(PhoenixConnection connection, PTable viewIndex, byte[] hbaseIndexTableName, int rowId)
+            throws IOException, SQLException {
+
+        byte[] rowkey = ByteUtil.EMPTY_BYTE_ARRAY;
+        String rid = String.format(ROW_ID_FMT, rowId);
+
+
+        try (Table tbl = connection.getQueryServices().getTable(hbaseIndexTableName)) {
+
+            PName tenantId = connection.getTenantId();
+            Scan allRows = new Scan();
+            FilterList andFilter = new FilterList();
+            if (tenantId != null) {
+                andFilter.addFilter(new RowFilter(CompareOperator.EQUAL, new SubstringComparator(tenantId.getString())));
+            }
+            andFilter.addFilter(new RowFilter(CompareOperator.EQUAL, new SubstringComparator(rid)));
+            allRows.setFilter(andFilter);
+            ResultScanner scanner = tbl.getScanner(allRows);
+            int numMatchingRows = 0;
+            for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                PColumn viewIndexIdPKColumn = viewIndex.getPKColumns().get(0);
+                RowKeyColumnExpression
+                        viewIndexIdColExpr = new RowKeyColumnExpression(viewIndexIdPKColumn, new RowKeyValueAccessor(viewIndex.getPKColumns(), 0));
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                viewIndexIdColExpr.evaluate(new ResultTuple(result), ptr);
+                long actualViewIndexID;
+                if (hasLongViewIndexEnabled()) {
+                    actualViewIndexID = PLong.INSTANCE.getCodec().decodeLong(ptr, SortOrder.ASC);
+                } else {
+                    actualViewIndexID = PSmallint.INSTANCE.getCodec().decodeShort(ptr, SortOrder.ASC);
+                }
+
+                assertTrue("ViewIndexId's not match", viewIndex.getViewIndexId() == actualViewIndexID);
+                rowkey = result.getRow();
+                numMatchingRows++;
+            }
+            assertEquals(String.format("Expected rows do match for index table = %s, row-key = %s, rowId = %s",
+                    Bytes.toString(hbaseIndexTableName), Bytes.toStringBinary(rowkey), rowId),
+                    1, numMatchingRows);
+
+        }
+    }
+
+    protected abstract boolean hasLongViewIndexEnabled();
+
+    private SortOrder[][] getSortOrders() {
+        SortOrder[][] sortOrders = new SortOrder[][] {
+                {SortOrder.ASC, SortOrder.ASC, SortOrder.ASC},
+                {SortOrder.ASC, SortOrder.ASC, SortOrder.DESC},
+                {SortOrder.ASC, SortOrder.DESC, SortOrder.ASC},
+                {SortOrder.ASC, SortOrder.DESC, SortOrder.DESC},
+                {SortOrder.DESC, SortOrder.ASC, SortOrder.ASC},
+                {SortOrder.DESC, SortOrder.ASC, SortOrder.DESC},
+                {SortOrder.DESC, SortOrder.DESC, SortOrder.ASC},
+                {SortOrder.DESC, SortOrder.DESC, SortOrder.DESC}
+        };
+        return sortOrders;
+    }
+
+    private List<PDataType[]> getTestCases() {
+
+        List<PDataType[]> testCases = new ArrayList<>();
+        // Test Case 1: PK1 = Integer, PK2 = Integer, PK3 = Integer
+        testCases.add(new PDataType[] { PInteger.INSTANCE, PInteger.INSTANCE, PInteger.INSTANCE});
+        // Test Case 2: PK1 = Long, PK2 = Long, PK3 = Long
+        testCases.add(new PDataType[] {PLong.INSTANCE, PLong.INSTANCE, PLong.INSTANCE});
+        // Test Case 3: PK1 = Timestamp, PK2 = Timestamp, PK3 = Timestamp
+        testCases.add(new PDataType[] { PTimestamp.INSTANCE, PTimestamp.INSTANCE, PTimestamp.INSTANCE});
+        // Test Case 4: PK1 = Char, PK2 = Char, PK3 = Char
+        testCases.add(new PDataType[] {PChar.INSTANCE, PChar.INSTANCE, PChar.INSTANCE});
+        // Test Case 5: PK1 = Decimal, PK2 = Decimal, PK3 = Integer
+        // last PK cannot be of variable length when creating a view on top of it
+        testCases.add(new PDataType[] { PDecimal.INSTANCE, PDecimal.INSTANCE, PInteger.INSTANCE});
+        // Test Case 6: PK1 = Date, PK2 = Date, PK3 = Date
+        testCases.add(new PDataType[] { PDate.INSTANCE, PDate.INSTANCE, PDate.INSTANCE});
+        // Test Case 7: PK1 = Varchar, PK2 = Varchar, PK3 = Integer
+        // last PK cannot be of variable length when creating a view on top of it
+        testCases.add(new PDataType[] {PVarchar.INSTANCE, PVarchar.INSTANCE, PInteger.INSTANCE});
+
+        return testCases;
+    }
+
+    @Test
+    public void testViewsWithExtendedPK() {
+        try {
+            List<PDataType[]> testCases = getTestCases();
+            SortOrder[][] sortOrders = getSortOrders();
+
+            String tableName = "";
+            tableName = createViewHierarchy(testCases, sortOrders, 500,5000,3,true, true, false);
+            assertRowKeyPrefixesForTable(
+                    getUrl(),
+                    SchemaUtil.getSchemaNameFromFullName(tableName),
+                    SchemaUtil.getTableNameFromFullName(tableName));
+            tableName = createViewHierarchy(testCases, sortOrders, 600,6000,3,false, true, false);
+            assertRowKeyPrefixesForTable(
+                    getUrl(),
+                    SchemaUtil.getSchemaNameFromFullName(tableName),
+                    SchemaUtil.getTableNameFromFullName(tableName));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testViewsWithoutExtendedPK() {
+        try {
+            List<PDataType[]> testCases = getTestCases();
+            SortOrder[][] sortOrders = getSortOrders();
+            String tableName = "";
+            tableName = createViewHierarchy(testCases, sortOrders, 100,1000,3,true, false, false);
+            assertRowKeyPrefixesForTable(
+                    getUrl(),
+                    SchemaUtil.getSchemaNameFromFullName(tableName),
+                    SchemaUtil.getTableNameFromFullName(tableName));
+            tableName = createViewHierarchy(testCases, sortOrders, 200,2000,3,false, false, false);
+            assertRowKeyPrefixesForTable(
+                    getUrl(),
+                    SchemaUtil.getSchemaNameFromFullName(tableName),
+                    SchemaUtil.getTableNameFromFullName(tableName));
+            tableName = createViewHierarchy(testCases, sortOrders, 300,3000,3,true, false, true);
+            assertRowKeyPrefixesForTable(
+                    getUrl(),
+                    SchemaUtil.getSchemaNameFromFullName(tableName),
+                    SchemaUtil.getTableNameFromFullName(tableName));
+
+            tableName = createViewHierarchy(testCases, sortOrders, 400,4000,3,false, false, true);
+            assertRowKeyPrefixesForTable(
+                    getUrl(),
+                    SchemaUtil.getSchemaNameFromFullName(tableName),
+                    SchemaUtil.getTableNameFromFullName(tableName));
+
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.error(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testNonMultiTenantExtendedViewsWithViewIndexesFail() {
+        try {
+            List<PDataType[]> testCases = new ArrayList<>();
+            // Test Case 1: PK1 = Integer, PK2 = Integer, PK3 = Integer
+            testCases.add(new PDataType[] {PInteger.INSTANCE, PInteger.INSTANCE, PInteger.INSTANCE});
+
+            SortOrder[][] sortOrders = new SortOrder[][] {
+                    {SortOrder.ASC, SortOrder.ASC, SortOrder.ASC}
+            };
+            createViewHierarchy(testCases, sortOrders, 900,9000,3,false, true, true);
+            fail();
+        } catch (SQLException sqle) {
+            assertEquals(VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES.getErrorCode(),
+                    sqle.getErrorCode());
+        } catch (Exception e) {
+            fail("SQLException expected: " + VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES);
+        }
+
+    }
+
+    @Test
+    public void testMultiTenantExtendedViewsWithViewIndexesFail() {
+        try {
+            List<PDataType[]> testCases = new ArrayList<>();
+            // Test Case 1: PK1 = Integer, PK2 = Integer, PK3 = Integer
+            testCases.add(new PDataType[] {PInteger.INSTANCE, PInteger.INSTANCE, PInteger.INSTANCE});
+
+            SortOrder[][] sortOrders = new SortOrder[][] {
+                    {SortOrder.ASC, SortOrder.ASC, SortOrder.ASC}
+            };
+            String tableName = "";
+            tableName = createViewHierarchy(testCases, sortOrders, 910,9100,3,true, true, true);
+            assertRowKeyPrefixesForTable(
+                    getUrl(),
+                    SchemaUtil.getSchemaNameFromFullName(tableName),
+                    SchemaUtil.getTableNameFromFullName(tableName));
+            fail();
+        } catch (SQLException sqle) {
+            assertEquals(VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES.getErrorCode(),
+                    sqle.getErrorCode());
+        } catch (Exception e) {
+            fail("SQLException expected: " + VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES);
+        }
+
+    }
+
+    private String createViewHierarchy(List<PDataType[]> testCases, SortOrder[][] sortOrders, int startPartition, int startRowId, int numTenants, boolean isMultiTenant, boolean extendPK, boolean hasGlobalViewIndexes) throws Exception {
+
+        Map<String, byte[]> actualViewToRowKeyMap = Maps.newHashMap();
+        String tableName = BaseTest.generateUniqueName();
+
+        // Create a base table
+        createBaseTable(tableName, isMultiTenant);
+        String baseTableName = String.format(BASE_TABLE_NAME_FMT, tableName);
+        String indexTableName = String.format(INDEX_TABLE_NAME_FMT, tableName);
+
+
+        int partition = startPartition;
+        int rowId = startRowId;
+
+        // Create the global view
+        for (int testCase=0;testCase<testCases.size();testCase++) {
+            for (int index=0;index<sortOrders.length;index++) {
+                partition++;
+                Pair<String, byte[]> gvRowKeyInfo = createGlobalView(tableName,partition, testCases.get(testCase), sortOrders[index], hasGlobalViewIndexes);
+                actualViewToRowKeyMap.put(gvRowKeyInfo.getFirst(), gvRowKeyInfo.getSecond());
+                LOGGER.debug(String.format("Created global view %s with partition = %d", gvRowKeyInfo.getFirst(), partition));
+            }
+        }
+
+        partition = startPartition;
+        String[] globalViewPKNames = new String[] {"ID1", "ID2", "ID3"};
+        // Create the tenant view for each partition
+        for (int testCase=0;testCase<testCases.size();testCase++) {
+            for (int index=0;index<sortOrders.length;index++) {
+                partition++;
+                for (int tenant=1;tenant<=numTenants;tenant++) {
+                    Pair<String, byte[]> tvRowKeyInfo = createTenantView(extendPK, partition, tenant, 1, globalViewPKNames, testCases.get(testCase));
+                    actualViewToRowKeyMap.put(tvRowKeyInfo.getFirst(), tvRowKeyInfo.getSecond());
+                    LOGGER.debug(String.format("Created tenant view %s [partition = %d]", tvRowKeyInfo.getFirst(), partition));
+                }
+            }
+        }
+
+        partition = startPartition;
+        // Upsert rows into the tenant view for each partition
+        for (int testCase=0;testCase<testCases.size();testCase++) {
+            for (int index=0;index<sortOrders.length;index++) {
+                partition++;
+                for (int tenant=1;tenant<=numTenants;tenant++) {
+                    rowId++;
+                    try {
+                        upsertTenantViewRows(isMultiTenant, extendPK, partition, tenant, 1, rowId, globalViewPKNames, testCases.get(testCase));
+                    } catch (Exception ex) {
+                        String testInfo = Arrays.stream(testCases.get(testCase)).map(String::valueOf).collect(Collectors.joining(","));
+                        String sortInfo = Arrays.stream(sortOrders[index]).map(String::valueOf).collect(Collectors.joining(","));
+                        String pkInfo = Arrays.stream(globalViewPKNames).map(String::valueOf).collect(Collectors.joining(","));
+                        LOGGER.error(ex.getMessage());
+                        ex.printStackTrace();
+                        fail(String.format("isMultiTenant(%s), extendPK(%s), partition(%d), tenant(%s), rowId(%s), pkInfo(%s), testInfo(%s), sortInfo(%s)",
+                                isMultiTenant, extendPK, partition, tenant, rowId, pkInfo, testInfo, sortInfo));
+                    }
+                }
+            }
+        }
+
+        partition = startPartition;
+        rowId = startRowId;
+        // Validate the rowPrefix from SYSTEM.CATALOG prefix matches the rowkey from HBase
+        // for each tenant view
+        // actualViewToRowKeyMap holds row prefix for each view (global and tenant specific)
+        for (int testCase=0;testCase<testCases.size();testCase++) {
+            for (int index=0;index<sortOrders.length;index++) {
+                partition++;
+                for (int tenant=1;tenant<=numTenants;tenant++) {
+                    rowId++;
+                    String partitionName = String.format(PARTITION_FMT, partition);
+
+                    String tenantId = String.format(ORG_ID_FMT, ORG_ID_PREFIX, tenant);
+                    String tenantConnectionUrl = String.format(TENANT_URL_FMT, getUrl(), TENANT_ID_ATTRIB, tenantId);
+                    String tenantViewName = String.format(TENANT_VIEW_NAME_FMT, partitionName, 1);
+                    String tenantViewKey = String.format("%s.%s", tenantId, tenantViewName);
+                    try (PhoenixConnection tenantConnection = DriverManager.getConnection(tenantConnectionUrl).unwrap(PhoenixConnection.class)) {
+                        assertHBaseRowKeyMatchesPrefix(tenantConnection, baseTableName.getBytes(StandardCharsets.UTF_8), rowId, actualViewToRowKeyMap.get(tenantViewKey));
+                        if (hasGlobalViewIndexes) {
+                            PTable view = PhoenixRuntime.getTable(tenantConnection, tenantViewName);
+                            assertIndexTableRowKeyMatchesPrefix(tenantConnection, view.getIndexes().get(0), indexTableName.getBytes(StandardCharsets.UTF_8), rowId);
+                        }
+                    }
+                }
+            }
+        }
+        return baseTableName;
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/prefix/LongViewIndexDisabledBaseRowKeyPrefixIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/prefix/LongViewIndexDisabledBaseRowKeyPrefixIT.java
new file mode 100644
index 0000000000..a4fc076a6a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/prefix/LongViewIndexDisabledBaseRowKeyPrefixIT.java
@@ -0,0 +1,53 @@
+package org.apache.phoenix.end2end.prefix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class LongViewIndexDisabledBaseRowKeyPrefixIT extends BaseRowKeyPrefixTestIT {
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set(QueryServices.PHOENIX_TABLE_TTL_ENABLED, String.valueOf(true));
+        conf.set(QueryServices.LONG_VIEW_INDEX_ENABLED_ATTRIB, String.valueOf(false));
+        conf.set(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, "true");
+        conf.set(IndexManagementUtil.WAL_EDIT_CODEC_CLASS_KEY, "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec");
+
+        // Clear the cached singletons so we can inject our own.
+        InstanceResolver.clearSingletons();
+        // Make sure the ConnectionInfo doesn't try to pull a default Configuration
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+            @Override
+            public Configuration getConfiguration() {
+                return conf;
+            }
+
+            @Override
+            public Configuration getConfiguration(Configuration confToClone) {
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+
+        Map<String, String> DEFAULT_PROPERTIES = new HashMap() ;
+        setUpTestDriver(new ReadOnlyProps(DEFAULT_PROPERTIES.entrySet().iterator()));
+    }
+
+    @Override
+    protected boolean hasLongViewIndexEnabled() {
+        return false;
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/prefix/LongViewIndexEnabledBaseRowKeyPrefixIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/prefix/LongViewIndexEnabledBaseRowKeyPrefixIT.java
new file mode 100644
index 0000000000..11358c8d94
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/prefix/LongViewIndexEnabledBaseRowKeyPrefixIT.java
@@ -0,0 +1,53 @@
+package org.apache.phoenix.end2end.prefix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class LongViewIndexEnabledBaseRowKeyPrefixIT extends BaseRowKeyPrefixTestIT {
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set(QueryServices.PHOENIX_TABLE_TTL_ENABLED, String.valueOf(true));
+        conf.set(QueryServices.LONG_VIEW_INDEX_ENABLED_ATTRIB, String.valueOf(true));
+        conf.set(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, "true");
+        conf.set(IndexManagementUtil.WAL_EDIT_CODEC_CLASS_KEY, "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec");
+
+        // Clear the cached singletons so we can inject our own.
+        InstanceResolver.clearSingletons();
+        // Make sure the ConnectionInfo doesn't try to pull a default Configuration
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+            @Override
+            public Configuration getConfiguration() {
+                return conf;
+            }
+
+            @Override
+            public Configuration getConfiguration(Configuration confToClone) {
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+
+        Map<String, String> DEFAULT_PROPERTIES = new HashMap() ;
+        setUpTestDriver(new ReadOnlyProps(DEFAULT_PROPERTIES.entrySet().iterator()));
+    }
+
+    @Override
+    protected boolean hasLongViewIndexEnabled() {
+        return true;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
index 5ca54662c4..45b4f9958d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@ -98,6 +98,8 @@ public class CreateTableCompiler {
         String viewStatementToBe = null;
         byte[][] viewColumnConstantsToBe = null;
         BitSet isViewColumnReferencedToBe = null;
+        byte[] rowKeyPrefix = ByteUtil.EMPTY_BYTE_ARRAY;
+
         // Check whether column families having local index column family suffix or not if present
         // don't allow creating table.
         // Also validate the default values expressions.
@@ -138,11 +140,17 @@ public class CreateTableCompiler {
                         .build().buildException();
             }
             viewTypeToBe = parentToBe.getViewType() == ViewType.MAPPED ? ViewType.MAPPED : ViewType.UPDATABLE;
+            Expression where = null;
             if (whereNode == null) {
-                viewStatementToBe = parentToBe.getViewStatement();
                 if (parentToBe.getViewType() == ViewType.READ_ONLY) {
                     viewTypeToBe = ViewType.READ_ONLY;
                 }
+                viewStatementToBe = parentToBe.getViewStatement();
+                if (viewStatementToBe != null) {
+                    SelectStatement select = new SQLParser(viewStatementToBe).parseQuery();
+                    whereNode = select.getWhere();
+                    where = whereNode.accept(expressionCompiler);
+                }
             } else {
                 whereNode = StatementNormalizer.normalize(whereNode, resolver);
                 if (whereNode.isStateless()) {
@@ -154,7 +162,7 @@ public class CreateTableCompiler {
                     SelectStatement select = new SQLParser(parentToBe.getViewStatement()).parseQuery().combine(whereNode);
                     whereNode = select.getWhere();
                 }
-                Expression where = whereNode.accept(expressionCompiler);
+                where = whereNode.accept(expressionCompiler);
                 if (where != null && !LiteralExpression.isTrue(where)) {
                     TableName baseTableName = create.getBaseTableName();
                     StringBuilder buf = new StringBuilder();
@@ -177,6 +185,9 @@ public class CreateTableCompiler {
                     && parentToBe.getPKColumns().isEmpty()) {
                 validateCreateViewCompilation(connection, parentToBe,
                     columnDefs, pkConstraint);
+            } else if (where != null && viewTypeToBe == ViewType.UPDATABLE) {
+                rowKeyPrefix = WhereOptimizer.getRowKeyPrefix(context, create.getTableName(),
+                        parentToBe, where);
             }
             verifyIfAnyParentHasIndexesAndViewExtendsPk(parentToBe, columnDefs, pkConstraint);
         }
@@ -207,7 +218,8 @@ public class CreateTableCompiler {
         final PTable parent = parentToBe;
 
         return new CreateTableMutationPlan(context, client, finalCreate, splits, parent,
-            viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, connection);
+                viewStatement, viewType, rowKeyPrefix,
+                viewColumnConstants, isViewColumnReferenced, connection);
     }
 
     /**
@@ -461,12 +473,15 @@ public class CreateTableCompiler {
         private final ViewType viewType;
         private final byte[][] viewColumnConstants;
         private final BitSet isViewColumnReferenced;
+
+        private final byte[] rowKeyPrefix;
         private final PhoenixConnection connection;
 
         private CreateTableMutationPlan(StatementContext context, MetaDataClient client,
                 CreateTableStatement finalCreate, byte[][] splits, PTable parent,
-                String viewStatement, ViewType viewType, byte[][] viewColumnConstants,
-                BitSet isViewColumnReferenced, PhoenixConnection connection) {
+                String viewStatement, ViewType viewType, byte[] rowKeyPrefix,
+                byte[][] viewColumnConstants, BitSet isViewColumnReferenced,
+                PhoenixConnection connection) {
             super(context, CreateTableCompiler.this.operation);
             this.client = client;
             this.finalCreate = finalCreate;
@@ -474,6 +489,7 @@ public class CreateTableCompiler {
             this.parent = parent;
             this.viewStatement = viewStatement;
             this.viewType = viewType;
+            this.rowKeyPrefix = rowKeyPrefix;
             this.viewColumnConstants = viewColumnConstants;
             this.isViewColumnReferenced = isViewColumnReferenced;
             this.connection = connection;
@@ -483,8 +499,8 @@ public class CreateTableCompiler {
         public MutationState execute() throws SQLException {
             try {
                 return client.createTable(finalCreate, splits, parent, viewStatement,
-                    viewType, MetaDataUtil.getViewIndexIdDataType(), viewColumnConstants,
-                    isViewColumnReferenced);
+                        viewType, MetaDataUtil.getViewIndexIdDataType(), rowKeyPrefix,
+                        viewColumnConstants, isViewColumnReferenced);
             } finally {
                 if (client.getConnection() != connection) {
                     client.getConnection().close();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 94c3f87840..aca42ebabd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -68,7 +68,6 @@ public class ScanRanges {
         return create(schema, ranges, ScanUtil.getDefaultSlotSpans(ranges.size()), null, true, -1);
     }
     
-    // For testing
     public static ScanRanges createSingleSpan(RowKeySchema schema, List<List<KeyRange>> ranges, Integer nBuckets, boolean useSkipSan) {
         return create(schema, ranges, ScanUtil.getDefaultSlotSpans(ranges.size()), nBuckets, useSkipSan, -1);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 7da2c62c41..c18778d449 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -17,26 +17,9 @@
  */
 package org.apache.phoenix.compile;
 
-import java.math.BigInteger;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.phoenix.expression.DelegateExpression;
-import org.apache.phoenix.schema.ValueSchema;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -59,34 +42,55 @@ import org.apache.phoenix.expression.function.FunctionExpression.OrderPreserving
 import org.apache.phoenix.expression.function.ScalarFunction;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
 import org.apache.phoenix.expression.visitor.StatelessTraverseNoExpressionVisitor;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.LikeParseNode.LikeType;
+import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.ValueSchema;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Iterators;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import edu.umd.cs.findbugs.annotations.NonNull;
+import java.math.BigInteger;
 
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
 
 /**
  *
@@ -97,6 +101,8 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * @since 0.1
  */
 public class WhereOptimizer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(WhereOptimizer.class);
     private static final List<KeyRange> EVERYTHING_RANGES =
             Collections.<KeyRange> singletonList(KeyRange.EVERYTHING_RANGE);
     private static final List<KeyRange> SALT_PLACEHOLDER =
@@ -476,6 +482,125 @@ public class WhereOptimizer {
         return keyRanges;
     }
 
+    public static byte[] getRowKeyPrefix(
+            final StatementContext context,
+            final TableName tableNameNode,
+            final PTable parentTable,
+            final Expression viewWhereExpression
+    ) {
+        RowKeySchema schema = parentTable.getRowKeySchema();
+        List<List<KeyRange>> rowKeySlotRangesList = new ArrayList<>();
+        PName tenantId = context.getConnection().getTenantId();
+        byte[] tenantIdBytes = tenantId == null
+                ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getString().getBytes(StandardCharsets.UTF_8);
+        if (tenantIdBytes.length != 0) {
+            rowKeySlotRangesList.add(Arrays.asList(KeyRange.POINT.apply(tenantIdBytes)));
+        }
+        KeyExpressionVisitor visitor = new KeyExpressionVisitor(context, parentTable);
+        KeyExpressionVisitor.KeySlots keySlots = viewWhereExpression.accept(visitor);
+
+        for (KeyExpressionVisitor.KeySlot slot : keySlots.getSlots()) {
+            if (slot != null) {
+                if (schema.getField(slot.getPKPosition()).getSortOrder() == SortOrder.DESC) {
+                    rowKeySlotRangesList.add(invertKeyRanges(slot.getKeyRanges()));
+                    continue;
+                }
+                rowKeySlotRangesList.add(slot.getKeyRanges());
+            }
+        }
+        ScanRanges scanRange = ScanRanges.createSingleSpan(
+                schema, rowKeySlotRangesList, null, false);
+        byte[] rowKeyPrefix = scanRange.getScanRange().getLowerRange();
+        // TODO : make it a TRACE log before submission
+        if (LOGGER.isTraceEnabled()) {
+            String rowKeyPrefixStr = Bytes.toStringBinary(rowKeyPrefix);
+            String rowKeyPrefixHex = Bytes.toHex(rowKeyPrefix);
+            byte[] rowKeyPrefixFromHex = Bytes.fromHex(rowKeyPrefixHex);
+            assert Bytes.compareTo(rowKeyPrefix, rowKeyPrefixFromHex) == 0;
+            LOGGER.trace(String.format("View info view-name = %s, view-stmt-name (parent) = %s, "
+                            + "primary-keys = %d, key-ranges: size = %d, list = %s ",
+                    tableNameNode.toString(), parentTable.getName().toString(),
+                    parentTable.getPKColumns().size(), rowKeySlotRangesList.size(),
+                    rowKeySlotRangesList.isEmpty() ? "null" : rowKeySlotRangesList.toString()));
+            LOGGER.trace(String.format("RowKey Prefix info Hex-value = %s, StringBinary value = %s",
+                    rowKeyPrefixHex, rowKeyPrefixStr));
+
+        }
+        return rowKeyPrefix;
+    }
+
+
+    @VisibleForTesting
+    public static byte[] getRowKeyPrefix(
+            final PhoenixConnection connection,
+            final TableName tableNameNode,
+            final PTable parentTable,
+            final byte[][] viewColumnConstantsToBe,
+            final BitSet isViewColumnReferencedToBe
+    ) throws SQLException {
+
+        RowKeySchema schema = parentTable.getRowKeySchema();
+        List<List<KeyRange>> rowKeySlotRangesList = new ArrayList<>();
+        PName tenantId = connection.getTenantId();
+        byte[] tenantIdBytes = tenantId == null
+                ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getString().getBytes(StandardCharsets.UTF_8);
+        if (tenantIdBytes.length != 0) {
+            rowKeySlotRangesList.add(Arrays.asList(KeyRange.POINT.apply(tenantIdBytes)));
+        }
+
+        int pkPos = 0;
+        for (int i = 0; viewColumnConstantsToBe != null && i < viewColumnConstantsToBe.length; i++) {
+            if  (isViewColumnReferencedToBe.get(i)) {
+                pkPos++;
+                ValueSchema.Field field = schema.getField(pkPos);
+                SortOrder fieldSortOrder = schema.getField(pkPos).getSortOrder();
+                byte[] viewColumnConstants = Bytes.copy(
+                        viewColumnConstantsToBe[i],
+                        0,
+                        viewColumnConstantsToBe[i].length - 1);
+                KeyRange keyRange = ByteUtil.getKeyRange(
+                        viewColumnConstants,
+                        fieldSortOrder,
+                        CompareOperator.EQUAL,
+                        field.getDataType());
+                // TODO : make it a TRACE log before submission
+                if (LOGGER.isTraceEnabled()) {
+                    LOGGER.trace(String.format("Field: pos = %d, name = %s, schema = %s, "
+                                            + "referenced-column %d, %s ",
+                            pkPos, parentTable.getPKColumns().get(pkPos),
+                            schema.getField(pkPos).toString(),
+                            i, Bytes.toHex(viewColumnConstantsToBe[i])));
+                }
+                rowKeySlotRangesList.add(Arrays.asList(keyRange));
+            }
+
+        }
+
+        ScanRanges scanRange = ScanRanges.createSingleSpan(
+                schema, rowKeySlotRangesList, null, false);
+        byte[] rowKeyPrefix = scanRange.getScanRange().getLowerRange();
+
+        // TODO : make it a TRACE log before submission
+        if (LOGGER.isTraceEnabled()) {
+            String rowKeyPrefixStr = Bytes.toStringBinary(rowKeyPrefix);
+            String rowKeyPrefixHex = Bytes.toHex(rowKeyPrefix);
+            byte[] rowKeyPrefixFromHex = Bytes.fromHex(rowKeyPrefixHex);
+            assert Bytes.compareTo(rowKeyPrefix, rowKeyPrefixFromHex) == 0;
+
+            LOGGER.trace(String.format("View info view-name = %s, view-stmt-name (parent) = %s, "
+                            + "primary-keys = %d, key-ranges:  size = %d, list = %s ",
+                    tableNameNode.toString(), parentTable.getName().toString(),
+                    parentTable.getPKColumns().size(), rowKeySlotRangesList.size(),
+                    rowKeySlotRangesList.isEmpty() ? "null" : rowKeySlotRangesList.toString()));
+            LOGGER.trace(String.format("RowKey Prefix info Hex-value = %s, StringBinary value = %s",
+                    rowKeyPrefixHex, rowKeyPrefixStr));
+
+        }
+        return rowKeyPrefix;
+
+    }
+
+
     /**
      * Get an optimal combination of key expressions for hash join key range optimization.
      * @return returns true if the entire combined expression is covered by key range optimization
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index a7dd4c5b41..2028f7aeda 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -1416,8 +1416,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 
         Cell rowKeyPrefixKv = tableKeyValues[ROW_KEY_PREFIX_INDEX];
         byte[] rowKeyPrefix = rowKeyPrefixKv != null
-                ? (byte[]) PVarbinary.INSTANCE.toObject(rowKeyPrefixKv.getValueArray(),
-                        rowKeyPrefixKv.getValueOffset(), rowKeyPrefixKv.getValueLength())
+                ? CellUtil.cloneValue(rowKeyPrefixKv)
                 : null;
         builder.setRowKeyPrefix(rowKeyPrefix != null ? rowKeyPrefix
                 : oldTable != null ? oldTable.getRowKeyPrefix() : null);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 42755619bc..ff2351d425 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -959,7 +959,17 @@ public class MetaDataClient {
         argUpsert.execute();
     }
 
-    public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType, PDataType viewIndexIdType, byte[][] viewColumnConstants, BitSet isViewColumnReferenced) throws SQLException {
+    public MutationState createTable(
+            CreateTableStatement statement,
+            byte[][] splits,
+            PTable parent,
+            String viewStatement,
+            ViewType viewType,
+            PDataType viewIndexIdType,
+            byte[] rowKeyPrefix,
+            byte[][] viewColumnConstants,
+            BitSet isViewColumnReferenced
+    ) throws SQLException {
         TableName tableName = statement.getTableName();
         Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize(statement.getProps().size());
         Map<String,Object> commonFamilyProps = Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
@@ -1024,7 +1034,22 @@ public class MetaDataClient {
                         true, NamedTableNode.create(statement.getTableName()), statement.getTableType(), false, null);
             }
         }
-        table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewIndexIdType, viewColumnConstants, isViewColumnReferenced, false, null, null, tableProps, commonFamilyProps);
+        table = createTableInternal(
+                statement,
+                splits,
+                parent,
+                viewStatement,
+                viewType,
+                viewIndexIdType,
+                rowKeyPrefix,
+                viewColumnConstants,
+                isViewColumnReferenced,
+                false,
+                null,
+                null,
+                tableProps,
+                commonFamilyProps
+        );
 
         if (table == null || table.getType() == PTableType.VIEW
                 || statement.isNoVerify() /*|| table.isTransactional()*/) {
@@ -1695,13 +1720,35 @@ public class MetaDataClient {
             PrimaryKeyConstraint pk = FACTORY.primaryKey(null, allPkColumns);
 
             tableProps.put(MetaDataUtil.DATA_TABLE_NAME_PROP_NAME, dataTable.getPhysicalName().getString());
-            CreateTableStatement tableStatement = FACTORY.createTable(indexTableName,
-                    statement.getProps(), columnDefs, pk, statement.getSplitNodes(),
-                    PTableType.INDEX, statement.ifNotExists(), null,
-                    statement.getWhere(), statement.getBindCount(), null);
-            table = createTableInternal(tableStatement, splits, dataTable, null, null,
-                    getViewIndexDataType() ,null, null, allocateIndexId,
-                    statement.getIndexType(), asyncCreatedDate, tableProps, commonFamilyProps);
+            CreateTableStatement tableStatement = FACTORY.createTable(
+                    indexTableName,
+                    statement.getProps(),
+                    columnDefs,
+                    pk,
+                    statement.getSplitNodes(),
+                    PTableType.INDEX,
+                    statement.ifNotExists(),
+                    null,
+                    null,
+                    statement.getBindCount(),
+                    null
+            );
+            table = createTableInternal(
+                    tableStatement,
+                    splits,
+                    dataTable,
+                    null,
+                    null,
+                    getViewIndexDataType(),
+                    null,
+                    null,
+                    null,
+                    allocateIndexId,
+                    statement.getIndexType(),
+                    asyncCreatedDate,
+                    tableProps,
+                    commonFamilyProps
+            );
         }
         finally {
             deleteMutexCells(physicalSchemaName, physicalTableName, acquiredColumnMutexSet);
@@ -2073,8 +2120,9 @@ public class MetaDataClient {
 
     private PTable createTableInternal(CreateTableStatement statement, byte[][] splits,
             final PTable parent, String viewStatement, ViewType viewType, PDataType viewIndexIdType,
-            final byte[][] viewColumnConstants, final BitSet isViewColumnReferenced, boolean allocateIndexId,
-            IndexType indexType, Date asyncCreatedDate,
+            final byte[] rowKeyPrefix,
+            final byte[][] viewColumnConstants, final BitSet isViewColumnReferenced,
+            boolean allocateIndexId, IndexType indexType, Date asyncCreatedDate,
             Map<String,Object> tableProps,
             Map<String,Object> commonFamilyProps) throws SQLException {
         final PTableType tableType = statement.getTableType();
@@ -2127,7 +2175,6 @@ public class MetaDataClient {
 
             Integer phoenixTTL = TTL_NOT_DEFINED;
             Integer phoenixTTLProp = (Integer) TableProperty.TTL.getValue(tableProps);
-            byte[] rowKeyPrefix = null;
 
             // Validate TTL prop value if set
             if (phoenixTTLProp != null) {
@@ -2481,11 +2528,9 @@ public class MetaDataClient {
                         // set to the parent value if the property is not set on the view
                         updateCacheFrequency = parent.getUpdateCacheFrequency();
                     }
-
                     if (viewType == ViewType.UPDATABLE) {
                         phoenixTTL = getTTLFromParent(parent);
                     }
-
                     disableWAL = (disableWALProp == null ? parent.isWALDisabled() : disableWALProp);
                     defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
                     // TODO PHOENIX-4766 Add an options to stop sending parent metadata when creating views
@@ -2941,6 +2986,7 @@ public class MetaDataClient {
                         .setIndexes(Collections.<PTable>emptyList())
                         .setPhysicalNames(ImmutableList.<PName>of())
                         .setColumns(columns.values())
+                        .setRowKeyPrefix(rowKeyPrefix)
                         .setTTL(TTL_NOT_DEFINED)
                         .setIndexWhere(statement.getWhereClause() == null ? null
                                 : statement.getWhereClause().toString())
@@ -3198,8 +3244,7 @@ public class MetaDataClient {
             if (rowKeyPrefix == null) {
                 tableUpsert.setNull(35, Types.VARBINARY);
             } else {
-                //Need to update are we have Prefix Builder in place.
-                tableUpsert.setNull(35, Types.VARBINARY);
+                tableUpsert.setBytes(35, rowKeyPrefix);
             }
             if (tableType == INDEX && statement.getWhereClause() != null) {
                 tableUpsert.setString(36, statement.getWhereClause().toString());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 629393da3a..15adf81813 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -57,7 +57,6 @@ import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
-import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.base.Objects;
@@ -80,7 +79,6 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
-import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -96,6 +94,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import javax.annotation.Nonnull;
 
 import static org.apache.phoenix.compile.WhereCompiler.transformDNF;
 import static org.apache.phoenix.coprocessor.ScanRegionObserver.DYNAMIC_COLUMN_METADATA_STORED_FOR_MUTATION;
@@ -2015,7 +2014,7 @@ public class PTableImpl implements PTable {
 
         byte[] rowKeyPrefix = null;
         if (table.hasRowKeyPrefix()) {
-            rowKeyPrefix = PVarbinary.INSTANCE.toBytes(table.getRowKeyPrefix());
+            rowKeyPrefix = table.getRowKeyPrefix().toByteArray();
         }
 
         String indexWhere = null;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
index 7e336b9cf9..dc98ef426f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
@@ -626,6 +627,7 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest {
     }
     
     @Test
+    @Ignore("PHOENIX-4555 should mark these views as ViewType.READONLY")
     public void testAssertQueryAgainstTenantSpecificViewDoesNotGoThroughIndex() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl(), new Properties());
         
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
index 9308a91843..784c682226 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
@@ -34,10 +34,12 @@ import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class ViewCompilerTest extends BaseConnectionlessQueryTest {
     @Test
+    @Ignore("PHOENIX-4555 should mark these views as ViewType.READONLY")
     public void testViewTypeCalculation() throws Exception {
         assertViewType(new String[] {"V1","V2","V3","V4"}, new String[] {
             "CREATE VIEW v1 AS SELECT * FROM t WHERE k1 = 1 AND k2 = 'foo'",
@@ -77,6 +79,7 @@ public class ViewCompilerTest extends BaseConnectionlessQueryTest {
     }
 
     @Test
+    @Ignore("PHOENIX-4555 should mark these views as ViewType.READONLY")
     public void testViewInvalidation() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java
index a593583a63..6e9b256db9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java
@@ -134,7 +134,7 @@ public class PhoenixResultSetMetadataTest extends BaseConnectionlessQueryTest {
         conn.createStatement().execute(
                 "CREATE TABLE IF NOT EXISTS S.T (A INTEGER PRIMARY KEY, B INTEGER, C VARCHAR, D INTEGER)");
         conn.createStatement().execute(
-                "CREATE VIEW IF NOT EXISTS S.V (VA INTEGER, VB INTEGER) AS SELECT * FROM S.T WHERE B=200");
+                "CREATE VIEW IF NOT EXISTS S.V (VA INTEGER, VB INTEGER) AS SELECT * FROM S.T WHERE A=2");
         conn.createStatement().execute(
                 "UPSERT INTO S.V (A, B, C, D, VA, VB) VALUES (2, 200, 'def', -20, 91, 101)");
         conn.createStatement().execute(