You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/02/18 11:04:56 UTC

[2/4] PHOENIX-21: Support indexes on multi-tenant views. Still needs more testing

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 4ee071c..5c99e4b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -47,6 +47,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
@@ -126,6 +128,7 @@ import com.google.common.primitives.Ints;
 
 public class MetaDataClient {
     private static final Logger logger = LoggerFactory.getLogger(MetaDataClient.class);
+    public static final byte[][] EMPTY_VIEW_CONSTANTS = new byte[0][];
 
     private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
     private static final String CREATE_TABLE =
@@ -145,8 +148,9 @@ public class MetaDataClient {
             VIEW_STATEMENT + "," +
             DISABLE_WAL + "," +
             MULTI_TENANT + "," +
-            VIEW_TYPE +
-            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+            VIEW_TYPE + "," +
+            VIEW_INDEX_ID +
+            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String CREATE_LINK =
             "UPSERT INTO " + TYPE_SCHEMA + ".\"" + TYPE_TABLE + "\"( " +
             TENANT_ID + "," +
@@ -169,11 +173,29 @@ public class MetaDataClient {
         TABLE_NAME_NAME + "," +
         TABLE_TYPE_NAME + "," +
         TABLE_SEQ_NUM + "," +
-        COLUMN_COUNT + "," +
-        IMMUTABLE_ROWS + "," +
-        DISABLE_WAL + "," +
-        MULTI_TENANT +
-        ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+        COLUMN_COUNT +
+        ") VALUES (?, ?, ?, ?, ?, ?)";
+    private static final String MUTATE_MULTI_TENANT =
+            "UPSERT INTO " + TYPE_SCHEMA + ".\"" + TYPE_TABLE + "\"( " + 
+            TENANT_ID + "," +
+            TABLE_SCHEM_NAME + "," +
+            TABLE_NAME_NAME + "," +
+            MULTI_TENANT + 
+            ") VALUES (?, ?, ?, ?)";
+    private static final String MUTATE_DISABLE_WAL =
+            "UPSERT INTO " + TYPE_SCHEMA + ".\"" + TYPE_TABLE + "\"( " + 
+            TENANT_ID + "," +
+            TABLE_SCHEM_NAME + "," +
+            TABLE_NAME_NAME + "," +
+            DISABLE_WAL + 
+            ") VALUES (?, ?, ?, ?)";
+    private static final String MUTATE_IMMUTABLE_ROWS =
+            "UPSERT INTO " + TYPE_SCHEMA + ".\"" + TYPE_TABLE + "\"( " + 
+            TENANT_ID + "," +
+            TABLE_SCHEM_NAME + "," +
+            TABLE_NAME_NAME + "," +
+            IMMUTABLE_ROWS + 
+            ") VALUES (?, ?, ?, ?)";
     private static final String UPDATE_INDEX_STATE =
             "UPSERT INTO " + TYPE_SCHEMA + ".\"" + TYPE_TABLE + "\"( " + 
             TENANT_ID + "," +
@@ -195,8 +217,9 @@ public class MetaDataClient {
         ORDINAL_POSITION + "," + 
         SORT_ORDER + "," +
         DATA_TABLE_NAME + "," + // write this both in the column and table rows for access by metadata APIs
-        ARRAY_SIZE +
-        ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+        ARRAY_SIZE + "," +
+        VIEW_CONSTANT +
+        ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String UPDATE_COLUMN_POSITION =
         "UPSERT INTO " + TYPE_SCHEMA + ".\"" + TYPE_TABLE + "\" ( " + 
         TENANT_ID + "," +
@@ -337,6 +360,7 @@ public class MetaDataClient {
         } else {
             colUpsert.setInt(13, column.getArraySize());
         }
+        colUpsert.setBytes(14, column.getViewConstant());
         colUpsert.execute();
     }
 
@@ -374,19 +398,18 @@ public class MetaDataClient {
             }
             
             PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
-                    def.getMaxLength(), def.getScale(), def.isNull(), position, sortOrder, def.getArraySize());
+                    def.getMaxLength(), def.getScale(), def.isNull(), position, sortOrder, def.getArraySize(), null);
             return column;
         } catch (IllegalArgumentException e) { // Based on precondition check in constructor
             throw new SQLException(e);
         }
     }
 
-    public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType) throws SQLException {
-        PTable table = createTableInternal(statement, splits, parent, viewStatement, viewType);
+    public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType, byte[][] viewColumnConstants) throws SQLException {
+        PTable table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants);
         if (table == null || table.getType() == PTableType.VIEW) {
             return new MutationState(0,connection);
         }
-        
         // Hack to get around the case when an SCN is specified on the connection.
         // In this case, we won't see the table we just created yet, so we hack
         // around it by forcing the compiler to not resolve anything.
@@ -488,8 +511,11 @@ public class MetaDataClient {
                 ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
                 tableRef = resolver.getTables().get(0);
                 PTable dataTable = tableRef.getTable();
-                if (dataTable.getType() == PTableType.VIEW && dataTable.getViewType() != ViewType.MAPPED) {
-                    throw new SQLFeatureNotSupportedException("Creating an index on a view is not supported currently, but will be soon");
+                boolean isTenantConnection = connection.getTenantId() != null;
+                if (isTenantConnection) {
+                    if (dataTable.getType() != PTableType.VIEW) {
+                        throw new SQLFeatureNotSupportedException("An index may only be created for a VIEW through a tenant-specific connection");
+                    }
                 }
                 int hbaseVersion = connection.getQueryServices().getLowestClusterHBaseVersion();
                 if (!dataTable.isImmutableRows()) {
@@ -500,33 +526,59 @@ public class MetaDataClient {
                         throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setTableName(indexTableName.getTableName()).build().buildException();
                     }
                 }
+                int posOffset = 0;
                 Set<PColumn> unusedPkColumns;
                 if (dataTable.getBucketNum() != null) { // Ignore SALT column
                     unusedPkColumns = new LinkedHashSet<PColumn>(dataTable.getPKColumns().subList(1, dataTable.getPKColumns().size()));
+                    posOffset++;
                 } else {
                     unusedPkColumns = new LinkedHashSet<PColumn>(dataTable.getPKColumns());
                 }
                 List<Pair<ColumnName, SortOrder>> allPkColumns = Lists.newArrayListWithExpectedSize(unusedPkColumns.size());
                 List<ColumnDef> columnDefs = Lists.newArrayListWithExpectedSize(includedColumns.size() + indexedPkColumns.size());
                 
+                if (dataTable.isMultiTenant()) {
+                    // Add tenant ID column as first column in index
+                    PColumn col = dataTable.getPKColumns().get(posOffset);
+                    unusedPkColumns.remove(col);
+                    PDataType dataType = IndexUtil.getIndexColumnDataType(col);
+                    ColumnName colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
+                    allPkColumns.add(new Pair<ColumnName, SortOrder>(colName, col.getSortOrder()));
+                    columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, SortOrder.getDefault()));
+                }
+                if (dataTable.getType() == PTableType.VIEW && dataTable.getViewType() != ViewType.MAPPED) {
+                    // Next add index ID column
+                    PDataType dataType = MetaDataUtil.getViewIndexIdDataType();
+                    ColumnName colName = ColumnName.caseSensitiveColumnName(MetaDataUtil.getViewIndexIdColumnName());
+                    allPkColumns.add(new Pair<ColumnName, SortOrder>(colName, SortOrder.getDefault()));
+                    columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), false, null, null, false, SortOrder.getDefault()));
+                }
+                boolean isUpdatableView = dataTable.getViewType() == ViewType.UPDATABLE;
                 // First columns are the indexed ones
                 for (Pair<ColumnName, SortOrder> pair : indexedPkColumns) {
                     ColumnName colName = pair.getFirst();
                     PColumn col = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn();
                     unusedPkColumns.remove(col);
-                    PDataType dataType = IndexUtil.getIndexColumnDataType(col);
-                    colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
-                    allPkColumns.add(new Pair<ColumnName, SortOrder>(colName, pair.getSecond()));
-                    columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, SortOrder.getDefault()));
+                    // Ignore view constants for updatable views as we don't need these in the index
+                    if (!isUpdatableView || col.getViewConstant() == null) {
+                        PDataType dataType = IndexUtil.getIndexColumnDataType(col);
+                        colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
+                        allPkColumns.add(new Pair<ColumnName, SortOrder>(colName, pair.getSecond()));
+                        columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, SortOrder.getDefault()));
+                    }
                 }
                 
                 // Next all the PK columns from the data table that aren't indexed
                 if (!unusedPkColumns.isEmpty()) {
                     for (PColumn col : unusedPkColumns) {
-                        ColumnName colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
-                        allPkColumns.add(new Pair<ColumnName, SortOrder>(colName, col.getSortOrder()));
-                        PDataType dataType = IndexUtil.getIndexColumnDataType(col);
-                        columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, col.getSortOrder()));
+                        // Don't add columns with constant values from updatable views, as
+                        // we don't need these in the index
+                        if (!isUpdatableView || col.getViewConstant() == null) {
+                            ColumnName colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
+                            allPkColumns.add(new Pair<ColumnName, SortOrder>(colName, col.getSortOrder()));
+                            PDataType dataType = IndexUtil.getIndexColumnDataType(col);
+                            columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, col.getSortOrder()));
+                        }
                     }
                 }
                 pk = FACTORY.primaryKey(null, allPkColumns);
@@ -544,7 +596,7 @@ public class MetaDataClient {
                         if (pk.contains(colName)) {
                             throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_EXIST_IN_DEF).build().buildException();
                         }
-                        if (!SchemaUtil.isPKColumn(col)) {
+                        if (!SchemaUtil.isPKColumn(col) && (!isUpdatableView || col.getViewConstant() == null)) {
                             // Need to re-create ColumnName, since the above one won't have the column family name
                             colName = ColumnName.caseSensitiveColumnName(col.getFamilyName().getString(), IndexUtil.getIndexColumnName(col));
                             columnDefs.add(FACTORY.columnDef(colName, col.getDataType().getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, col.getSortOrder()));
@@ -553,11 +605,12 @@ public class MetaDataClient {
                 }
                 
                 // Set DEFAULT_COLUMN_FAMILY_NAME of index to match data table
+                // We need this in the props so that the correct column family is created
                 if (dataTable.getDefaultFamilyName() != null && dataTable.getType() != PTableType.VIEW) {
                     statement.getProps().put("", new Pair<String,Object>(DEFAULT_COLUMN_FAMILY_NAME,dataTable.getDefaultFamilyName().getString()));
                 }
                 CreateTableStatement tableStatement = FACTORY.createTable(indexTableName, statement.getProps(), columnDefs, pk, statement.getSplitNodes(), PTableType.INDEX, statement.ifNotExists(), null, null, statement.getBindCount());
-                table = createTableInternal(tableStatement, splits, dataTable, null, null); // TODO: tenant-specific index
+                table = createTableInternal(tableStatement, splits, dataTable, null, null, EMPTY_VIEW_CONSTANTS);
                 break;
             } catch (ConcurrentTableMutationException e) { // Can happen if parent data table changes while above is in progress
                 if (retry) {
@@ -599,13 +652,15 @@ public class MetaDataClient {
     public MutationState createSequence(CreateSequenceStatement statement, long startWith, long incrementBy, int cacheSize) throws SQLException {
         Long scn = connection.getSCN();
         long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
-        String schemaName = statement.getSequenceName().getSchemaName();
-        String sequenceName = statement.getSequenceName().getTableName();
         String tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getString();
+        return createSequence(tenantId, statement.getSequenceName().getSchemaName(), statement.getSequenceName().getTableName(), statement.ifNotExists(), startWith, incrementBy, cacheSize, timestamp);
+    }
+    
+    private MutationState createSequence(String tenantId, String schemaName, String sequenceName, boolean ifNotExists, long startWith, long incrementBy, int cacheSize, long timestamp) throws SQLException {
         try {
             connection.getQueryServices().createSequence(tenantId, schemaName, sequenceName, startWith, incrementBy, cacheSize, timestamp);
         } catch (SequenceAlreadyExistsException e) {
-            if (statement.ifNotExists()) {
+            if (ifNotExists) {
                 return new MutationState(0, connection);
             }
             throw e;
@@ -622,7 +677,7 @@ public class MetaDataClient {
         return null;
     }
     
-    private PTable createTableInternal(CreateTableStatement statement, byte[][] splits, final PTable parent, String viewStatement, ViewType viewType) throws SQLException {
+    private PTable createTableInternal(CreateTableStatement statement, byte[][] splits, final PTable parent, String viewStatement, ViewType viewType, byte[][] viewColumnConstants) throws SQLException {
         final PTableType tableType = statement.getTableType();
         boolean wasAutoCommit = connection.getAutoCommit();
         connection.rollback();
@@ -634,9 +689,49 @@ public class MetaDataClient {
             String schemaName = tableNameNode.getSchemaName();
             String tableName = tableNameNode.getTableName();
             String parentTableName = null;
-            String tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getString();
+            Long scn = connection.getSCN();
+            long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+            PName tenantId = connection.getTenantId();
+            String tenantIdStr = tenantId == null ? null : connection.getTenantId().getString();
             boolean isParentImmutableRows = false;
+            Short viewIndexId = null;
+            boolean multiTenant = false;
+            Integer saltBucketNum = null;
+            String defaultFamilyName = null;
+            boolean isImmutableRows = false;
+            List<PName> physicalNames = Collections.emptyList();
             if (parent != null && tableType == PTableType.INDEX) {
+                // Index on view
+                // TODO: Can we support a multi-tenant index directly on a multi-tenant
+                // table instead of only a view? We don't have anywhere to put the link
+                // from the table to the index, though.
+                if (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED) {
+                    PName physicalName = parent.getPhysicalName();
+                    SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName);
+                    // Create at parent timestamp as we know that will be earlier than now
+                    // and earlier than any SCN if one is set.
+                    // TODO: If sequence already exists, then we don't need to ensure metadata is already created.
+                    // Set physicalName to null in this case?
+                    createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(), true, Short.MIN_VALUE, 1, 1, parent.getTimeStamp());
+                    long[] seqValues = new long[1];
+                    SQLException[] sqlExceptions = new SQLException[1];
+                    connection.getQueryServices().incrementSequenceValues(Collections.singletonList(key), timestamp, seqValues, sqlExceptions);
+                    if (sqlExceptions[0] != null) {
+                        throw sqlExceptions[0];
+                    }
+                    long seqValue = seqValues[0];
+                    if (seqValue > Short.MAX_VALUE) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.TOO_MANY_VIEW_INDEXES)
+                        .setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalName.getString())).setTableName(SchemaUtil.getTableNameFromFullName(physicalName.getString())).build().buildException();
+                    }
+                    viewIndexId = (short) seqValue;
+                    saltBucketNum = parent.getBucketNum();
+                    defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
+                    // Set physical name of view index table
+                    physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes())));
+                }
+                
+                multiTenant = parent.isMultiTenant();
                 isParentImmutableRows = parent.isImmutableRows();
                 parentTableName = parent.getTableName().getString();
                 // Pass through data table sequence number so we can check it hasn't changed
@@ -689,10 +784,6 @@ public class MetaDataClient {
             }
             
             boolean isSalted = false;
-            Integer saltBucketNum = null;
-            String defaultFamilyName = null;
-            boolean isImmutableRows = false;
-            boolean multiTenant = false;
             // Although unusual, it's possible to set a mapped VIEW as having immutable rows.
             // This tells Phoenix that you're managing the index maintenance yourself.
             if (tableType != PTableType.VIEW || viewType == ViewType.MAPPED) {
@@ -704,7 +795,8 @@ public class MetaDataClient {
                 }
             }
             
-            if (tableType != PTableType.VIEW) {
+            // Can't set any of these on views or shared indexes on views
+            if (tableType != PTableType.VIEW && viewIndexId == null) {
                 saltBucketNum = (Integer) tableProps.remove(PhoenixDatabaseMetaData.SALT_BUCKETS);
                 if (saltBucketNum != null && (saltBucketNum < 0 || saltBucketNum > SaltingUtil.MAX_BUCKET_NUM)) {
                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_BUCKET_NUM).build().buildException();
@@ -719,11 +811,13 @@ public class MetaDataClient {
                 }
                 isSalted = (saltBucketNum != null);
                 
-                Boolean multiTenantProp = (Boolean) tableProps.remove(PhoenixDatabaseMetaData.MULTI_TENANT);
-                multiTenant = Boolean.TRUE.equals(multiTenantProp);
-                
-                // Don't remove this prop, as we need it when we create the HBase metadata
-                defaultFamilyName = (String)tableProps.get(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);                
+                // Can't set MULTI_TENANT or DEFAULT_COLUMN_FAMILY_NAME on an index
+                if (tableType != PTableType.INDEX) {
+                    Boolean multiTenantProp = (Boolean) tableProps.remove(PhoenixDatabaseMetaData.MULTI_TENANT);
+                    multiTenant = Boolean.TRUE.equals(multiTenantProp);
+                    // Don't remove this prop, as we need it when we create the HBase metadata
+                    defaultFamilyName = (String)tableProps.get(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);                
+                }
             }
             
             boolean disableWAL = false;
@@ -734,7 +828,7 @@ public class MetaDataClient {
                 disableWAL = disableWALProp;
             }
             // Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
-            if (statement.getTableType() == PTableType.VIEW && !tableProps.isEmpty()) {
+            if ((statement.getTableType() == PTableType.VIEW || viewIndexId != null) && !tableProps.isEmpty()) {
                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build().buildException();
             }
             
@@ -742,12 +836,11 @@ public class MetaDataClient {
             List<PColumn> columns;
             LinkedHashSet<PColumn> pkColumns;    
             
-            if (tenantId != null && tableType != PTableType.VIEW) {
+            if (tenantId != null && (tableType != PTableType.VIEW && viewIndexId == null)) {
                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE)
                     .setSchemaName(schemaName).setTableName(tableName).build().buildException();
             }
             
-            List<PName> physicalNames = Collections.emptyList();
             if (tableType == PTableType.VIEW) {
                 physicalNames = Collections.singletonList(PNameFactory.newName(parent.getPhysicalName().getString()));
                 if (viewType == ViewType.MAPPED) {
@@ -764,22 +857,26 @@ public class MetaDataClient {
                     columns = newArrayListWithExpectedSize(parent.getColumns().size() + colDefs.size());
                     columns.addAll(parent.getColumns());
                     pkColumns = newLinkedHashSet(parent.getPKColumns());
-
-                    // Add row linking from data table row to physical table row
-                    PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK);
-                    for (PName physicalName : physicalNames) {
-                        linkStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
-                        linkStatement.setString(2, schemaName);
-                        linkStatement.setString(3, tableName);
-                        linkStatement.setString(4, physicalName.getString());
-                        linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue());
-                        linkStatement.execute();
-                    }
                 }
             } else {
                 columns = newArrayListWithExpectedSize(colDefs.size());
                 pkColumns = newLinkedHashSetWithExpectedSize(colDefs.size() + 1); // in case salted  
             }
+
+            // Don't add link for mapped view, as it just points back to itself and causes the drop to
+            // fail because it looks like there's always a view associated with it.
+            if (viewType != ViewType.MAPPED && !physicalNames.isEmpty()) {
+                // Add row linking from data table row to physical table row
+                PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK);
+                for (PName physicalName : physicalNames) {
+                    linkStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+                    linkStatement.setString(2, schemaName);
+                    linkStatement.setString(3, tableName);
+                    linkStatement.setString(4, physicalName.getString());
+                    linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue());
+                    linkStatement.execute();
+                }
+            }
             
             PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN);
             Map<String, PName> familyNames = Maps.newLinkedHashMap();
@@ -911,13 +1008,13 @@ public class MetaDataClient {
             
             // Bootstrapping for our SYSTEM.TABLE that creates itself before it exists 
             if (SchemaUtil.isMetaTable(schemaName,tableName)) {
-                PTable table = PTableImpl.makePTable(PNameFactory.newName(schemaName),PNameFactory.newName(tableName), tableType, null,
-                        MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME),
-                        null, columns, null, Collections.<PTable>emptyList(), isImmutableRows, 
-                        Collections.<PName>emptyList(), defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName),
-                        null, Boolean.TRUE.equals(disableWAL), false, null);
+                PTable table = PTableImpl.makePTable(tenantId,PNameFactory.newName(schemaName), PNameFactory.newName(tableName), tableType,
+                        null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
+                        PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME), null, columns, null, Collections.<PTable>emptyList(), 
+                        isImmutableRows, Collections.<PName>emptyList(),
+                        defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, Boolean.TRUE.equals(disableWAL), false, null, viewIndexId);
                 connection.addTable(table);
-            } else if (tableType == PTableType.INDEX) {
+            } else if (tableType == PTableType.INDEX && viewIndexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
                     int nIndexRowKeyColumns = isPK ? 1 : pkColumnsNames.size();
                     int nIndexKeyValueColumns = columns.size() - nIndexRowKeyColumns;
@@ -941,7 +1038,19 @@ public class MetaDataClient {
                 }
             }
             
-            for (PColumn column : columns) {
+            for (int i = 0; i < columns.size(); i++) {
+                PColumn column = columns.get(i);
+                int columnPosition = column.getPosition();
+                final byte[] viewConstant = columnPosition < viewColumnConstants.length ? viewColumnConstants[columnPosition] : null;
+                // For client-side cache, we need to update the column
+                if (viewConstant != null) {
+                    columns.set(i, column = new DelegateColumn(column) {
+                        @Override
+                        public byte[] getViewConstant() {
+                            return viewConstant;
+                        }
+                    });
+                }
                 addColumnMutation(schemaName, tableName, column, colUpsert, parentTableName);
             }
             
@@ -951,7 +1060,7 @@ public class MetaDataClient {
             String dataTableName = parent == null || tableType == PTableType.VIEW ? null : parent.getTableName().getString();
             PIndexState indexState = parent == null || tableType == PTableType.VIEW  ? null : PIndexState.BUILDING;
             PreparedStatement tableUpsert = connection.prepareStatement(CREATE_TABLE);
-            tableUpsert.setString(1, tenantId);
+            tableUpsert.setString(1, tenantIdStr);
             tableUpsert.setString(2, schemaName);
             tableUpsert.setString(3, tableName);
             tableUpsert.setString(4, tableType.getSerializedValue());
@@ -975,6 +1084,11 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setByte(16, viewType.getSerializedValue());
             }
+            if (viewIndexId == null) {
+                tableUpsert.setNull(17, Types.SMALLINT);
+            } else {
+                tableUpsert.setShort(17, viewIndexId);
+            }
             tableUpsert.execute();
             
             tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
@@ -992,7 +1106,7 @@ public class MetaDataClient {
                     QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE));
             MetaDataMutationResult result = connection.getQueryServices().createTable(
                     tableMetaData, 
-                    viewType == ViewType.MAPPED ? physicalNames.get(0).getBytes() : null,
+                    viewType == ViewType.MAPPED || viewIndexId != null ? physicalNames.get(0).getBytes() : null,
                     tableType, tableProps, familyPropList, splits);
             MutationCode code = result.getMutationCode();
             switch(code) {
@@ -1014,11 +1128,10 @@ public class MetaDataClient {
                 throw new ConcurrentTableMutationException(schemaName, tableName);
             default:
                 PTable table =  PTableImpl.makePTable(
-                        PNameFactory.newName(schemaName), PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(), PTable.INITIAL_SEQ_NUM, 
-                        pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns, dataTableName == null ? null : PNameFactory.newName(dataTableName), 
-                        Collections.<PTable>emptyList(), isImmutableRows, physicalNames,
-                        defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName),
-                                viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, viewType);
+                        tenantId, PNameFactory.newName(schemaName), PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(), 
+                        PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns, 
+                        dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows, physicalNames,
+                        defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, viewType, viewIndexId);
                 connection.addTable(table);
                 return table;
             }
@@ -1067,8 +1180,11 @@ public class MetaDataClient {
         if (isSalted) {
             iterator.next();
         }
+        // Tenant ID must be VARCHAR or CHAR and be NOT NULL
+        // NOT NULL is a requirement, since otherwise the table key would conflict
+        // potentially with the global table definition.
         PColumn tenantIdCol = iterator.next();
-        if (!tenantIdCol.getDataType().isCoercibleTo(VARCHAR)) {
+        if (!tenantIdCol.getDataType().isCoercibleTo(VARCHAR) || tenantIdCol.isNullable()) {
             throw new SQLExceptionInfo.Builder(INSUFFICIENT_MULTI_TENANT_COLUMNS).setSchemaName(schemaName).setTableName(tableName).build().buildException();
         }
     }
@@ -1124,22 +1240,36 @@ public class MetaDataClient {
                         connection.removeTable(tableName);
                     } catch (TableNotFoundException ignore) { } // Ignore - just means wasn't cached
                     
+                    // TODO: we need to drop the index data when a view is dropped
                     boolean dropMetaData = connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
-                    if (result.getTable() != null && tableType != PTableType.VIEW && !dropMetaData) {
+                    if (result.getTable() != null && tableType != PTableType.VIEW) {
                         connection.setAutoCommit(true);
-                        // Delete everything in the column. You'll still be able to do queries at earlier timestamps
+                        PTable table = result.getTable();
                         long ts = (scn == null ? result.getMutationTime() : scn);
                         // Create empty table and schema - they're only used to get the name from
                         // PName name, PTableType type, long timeStamp, long sequenceNumber, List<PColumn> columns
-                        PTable table = result.getTable();
-                        List<TableRef> tableRefs = Lists.newArrayListWithExpectedSize(1 + table.getIndexes().size());
-                        tableRefs.add(new TableRef(null, table, ts, false));
-                        // TODO: Let the standard mutable secondary index maintenance handle this?
-                        for (PTable index: table.getIndexes()) {
-                            tableRefs.add(new TableRef(null, index, ts, false));
+                        List<TableRef> tableRefs = Lists.newArrayListWithExpectedSize(2 + table.getIndexes().size());
+                        if (tableType == PTableType.TABLE && MetaDataUtil.hasViewIndexTable(connection, table.getPhysicalName())) {
+                            MetaDataUtil.deleteViewIndexSequences(connection, table.getPhysicalName());
+                            // TODO: consider removing this, as the DROP INDEX done for each DROP VIEW command
+                            // would have deleted all the rows already
+                            if (!dropMetaData) {
+                                String viewIndexSchemaName = MetaDataUtil.getViewIndexSchemaName(schemaName);
+                                String viewIndexTableName = MetaDataUtil.getViewIndexTableName(tableName);
+                                PTable viewIndexTable = new PTableImpl(null, viewIndexSchemaName, viewIndexTableName, ts, table.getColumnFamilies());
+                                tableRefs.add(new TableRef(null, viewIndexTable, ts, false));
+                            }
+                        }
+                        if (!dropMetaData) {
+                            // Delete everything in the column. You'll still be able to do queries at earlier timestamps
+                            tableRefs.add(new TableRef(null, table, ts, false));
+                            // TODO: Let the standard mutable secondary index maintenance handle this?
+                            for (PTable index: table.getIndexes()) {
+                                tableRefs.add(new TableRef(null, index, ts, false));
+                            }
+                            MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null, Collections.<PColumn>emptyList(), ts);
+                            return connection.getQueryServices().updateData(plan);
                         }
-                        MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null, Collections.<PColumn>emptyList(), ts);
-                        return connection.getQueryServices().updateData(plan);
                     }
                     break;
                 }
@@ -1156,8 +1286,17 @@ public class MetaDataClient {
             connection.removeTable(tableName);
             throw new TableNotFoundException(schemaName, tableName);
         case UNALLOWED_TABLE_MUTATION:
+            String columnName = null;
+            String familyName = null;
+            String msg = null;
+            // TODO: better to return error code
+            if (result.getColumnName() != null) {
+                familyName = result.getFamilyName() == null ? null : Bytes.toString(result.getFamilyName());
+                columnName = Bytes.toString(result.getColumnName());
+                msg = "Cannot drop column referenced by VIEW";
+            }
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MUTATE_TABLE)
-                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                .setSchemaName(schemaName).setTableName(tableName).setFamilyName(familyName).setColumnName(columnName).setMessage(msg).build().buildException();
         case COLUMN_ALREADY_EXISTS:
         case COLUMN_NOT_FOUND:
             break;
@@ -1185,30 +1324,52 @@ public class MetaDataClient {
     }
 
     private  long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta) throws SQLException {
-        return incrementTableSeqNum(table, expectedType, table.isImmutableRows(), table.isWALDisabled(), table.isMultiTenant(), columnCountDelta);
+        return incrementTableSeqNum(table, expectedType, columnCountDelta, null, null, null);
     }
     
-    private long incrementTableSeqNum(PTable table, PTableType expectedType, boolean isImmutableRows, boolean disableWAL, boolean isMultiTenant, int columnCountDelta) throws SQLException {
+    private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isImmutableRows, Boolean disableWAL, Boolean isMultiTenant) throws SQLException {
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
         // Ordinal position is 1-based and we don't count SALT column in ordinal position
         int totalColumnCount = table.getColumns().size() + (table.getBucketNum() == null ? 0 : -1);
         final long seqNum = table.getSequenceNumber() + 1;
         PreparedStatement tableUpsert = connection.prepareStatement(MUTATE_TABLE);
+        String tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getString();
         try {
-            tableUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+            tableUpsert.setString(1, tenantId);
             tableUpsert.setString(2, schemaName);
             tableUpsert.setString(3, tableName);
             tableUpsert.setString(4, expectedType.getSerializedValue());
             tableUpsert.setLong(5, seqNum);
             tableUpsert.setInt(6, totalColumnCount + columnCountDelta);
-            tableUpsert.setBoolean(7, isImmutableRows);
-            tableUpsert.setBoolean(8, disableWAL);
-            tableUpsert.setBoolean(9, isMultiTenant);
             tableUpsert.execute();
         } finally {
             tableUpsert.close();
         }
+        if (isImmutableRows != null) {
+            PreparedStatement tableBoolUpsert = connection.prepareStatement(MUTATE_IMMUTABLE_ROWS);
+            tableBoolUpsert.setString(1, tenantId);
+            tableBoolUpsert.setString(2, schemaName);
+            tableBoolUpsert.setString(3, tableName);
+            tableBoolUpsert.setBoolean(4, isImmutableRows);
+            tableBoolUpsert.execute();
+        }
+        if (disableWAL != null) {
+            PreparedStatement tableBoolUpsert = connection.prepareStatement(MUTATE_DISABLE_WAL);
+            tableBoolUpsert.setString(1, tenantId);
+            tableBoolUpsert.setString(2, schemaName);
+            tableBoolUpsert.setString(3, tableName);
+            tableBoolUpsert.setBoolean(4, disableWAL);
+            tableBoolUpsert.execute();
+        }
+        if (isMultiTenant != null) {
+            PreparedStatement tableBoolUpsert = connection.prepareStatement(MUTATE_MULTI_TENANT);
+            tableBoolUpsert.setString(1, tenantId);
+            tableBoolUpsert.setString(2, schemaName);
+            tableBoolUpsert.setString(3, tableName);
+            tableBoolUpsert.setBoolean(4, isMultiTenant);
+            tableBoolUpsert.execute();
+        }
         return seqNum;
     }
     
@@ -1223,7 +1384,7 @@ public class MetaDataClient {
             // Outside of retry loop, as we're removing the property and wouldn't find it the second time
             Boolean isImmutableRowsProp = (Boolean)statement.getProps().remove(PTable.IS_IMMUTABLE_ROWS_PROP_NAME);
             Boolean multiTenantProp = (Boolean)statement.getProps().remove(PhoenixDatabaseMetaData.MULTI_TENANT);
-            boolean disableWAL = Boolean.TRUE.equals(statement.getProps().remove(DISABLE_WAL));
+            Boolean disableWALProp = (Boolean)statement.getProps().remove(DISABLE_WAL);
             
             boolean retried = false;
             while (true) {
@@ -1249,13 +1410,23 @@ public class MetaDataClient {
                         .setColumnName(lastPK.getName().getString()).build().buildException();
                 }
                           
-                boolean isImmutableRows = table.isImmutableRows();
+                Boolean isImmutableRows = null;
                 if (isImmutableRowsProp != null) {
-                    isImmutableRows = isImmutableRowsProp;
+                    if (isImmutableRowsProp.booleanValue() != table.isImmutableRows()) {
+                        isImmutableRows = isImmutableRowsProp;
+                    }
                 }
-                boolean multiTenant = table.isMultiTenant();
+                Boolean multiTenant = null;
                 if (multiTenantProp != null) {
-                    multiTenant = Boolean.TRUE.equals(multiTenantProp);
+                    if (multiTenantProp.booleanValue() != table.isMultiTenant()) {
+                        multiTenant = multiTenantProp;
+                    }
+                }
+                Boolean disableWAL = null;
+                if (disableWALProp != null) {
+                    if (disableWALProp.booleanValue() != table.isWALDisabled()) {
+                        disableWAL = isImmutableRowsProp;
+                    }
                 }
                 
                 if (statement.getProps().get(PhoenixDatabaseMetaData.SALT_BUCKETS) != null) {
@@ -1279,6 +1450,14 @@ public class MetaDataClient {
                 List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnDefs.size());
 
                 if ( columnDefs.size() > 0 ) {
+                    // Disallow setting these props for now on an add column, as that
+                    // would a bit of a strange thing to do. We can revisit later if
+                    // need be. We'd need to do the error checking we do in the else
+                    // if we want to support this.
+                    if (isImmutableRowsProp != null || multiTenantProp != null) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE)
+                        .setTableName(table.getName().getString()).build().buildException();
+                    }
                     for( ColumnDef colDef : columnDefs) {
                         if (colDef != null && !colDef.isNull()) {
                             if(colDef.isPK()) {
@@ -1322,7 +1501,7 @@ public class MetaDataClient {
                     // Check that HBase configured properly for mutable secondary indexing
                     // if we're changing from an immutable table to a mutable table and we
                     // have existing indexes.
-                    if (isImmutableRowsProp != null && !isImmutableRows && table.isImmutableRows() && !table.getIndexes().isEmpty()) {
+                    if (Boolean.FALSE.equals(isImmutableRows) && !table.getIndexes().isEmpty()) {
                         int hbaseVersion = connection.getQueryServices().getLowestClusterHBaseVersion();
                         if (hbaseVersion < PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) {
                             throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES).setSchemaName(schemaName).setTableName(tableName).build().buildException();
@@ -1331,7 +1510,9 @@ public class MetaDataClient {
                             throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setSchemaName(schemaName).setTableName(tableName).build().buildException();
                         }
                     }
-                    // TODO: if switching table to multiTenant or multiType, do some error checking
+                    if (Boolean.TRUE.equals(multiTenant)) {
+                        throwIfInsufficientColumns(schemaName, tableName, table.getPKColumns(), table.getBucketNum()!=null, multiTenant);
+                    }
                 }
                 
                 if (isAddingPKColumn && !table.getIndexes().isEmpty()) {
@@ -1341,7 +1522,7 @@ public class MetaDataClient {
                     tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
                     connection.rollback();
                 }
-                long seqNum = incrementTableSeqNum(table, statement.getTableType(), isImmutableRows, disableWAL, multiTenant, 1);
+                long seqNum = incrementTableSeqNum(table, statement.getTableType(), 1, isImmutableRows, disableWAL, multiTenant);
                 
                 tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
                 connection.rollback();
@@ -1351,7 +1532,6 @@ public class MetaDataClient {
                 Pair<byte[],Map<String,Object>> family = families.size() > 0 ? families.get(0) : null;
                 
                 // Figure out if the empty column family is changing as a result of adding the new column
-                // The empty column family of an index will never change as a result of adding a new data column
                 byte[] emptyCF = null;
                 byte[] projectCF = null;
                 if (table.getType() != PTableType.VIEW && family != null) {
@@ -1366,7 +1546,7 @@ public class MetaDataClient {
                         }
                     }
                 }
-                MetaDataMutationResult result = connection.getQueryServices().addColumn(tableMetaData, statement.getTableType(), families);
+                MetaDataMutationResult result = connection.getQueryServices().addColumn(tableMetaData, families, table);
                 try {
                     MutationCode code = processMutationResult(schemaName, tableName, result);
                     if (code == MutationCode.COLUMN_ALREADY_EXISTS) {
@@ -1379,7 +1559,31 @@ public class MetaDataClient {
                     // Only update client side cache if we aren't adding a PK column to a table with indexes.
                     // We could update the cache manually then too, it'd just be a pain.
                     if (!isAddingPKColumn || table.getIndexes().isEmpty()) {
-                        connection.addColumn(SchemaUtil.getTableName(schemaName, tableName), columns, result.getMutationTime(), seqNum, isImmutableRows);
+                        connection.addColumn(SchemaUtil.getTableName(schemaName, tableName), columns, result.getMutationTime(), seqNum, isImmutableRows == null ? table.isImmutableRows() : isImmutableRows);
+                    }
+                    // Delete rows in view index if we haven't dropped it already
+                    // We only need to do this if the multiTenant transitioned to false
+                    if (table.getType() == PTableType.TABLE
+                            && Boolean.FALSE.equals(multiTenant)
+                            && MetaDataUtil.hasViewIndexTable(connection, table.getPhysicalName())) {
+                        connection.setAutoCommit(true);
+                        MetaDataUtil.deleteViewIndexSequences(connection, table.getPhysicalName());
+                        // If we're not dropping metadata, then make sure no rows are left in
+                        // our view index physical table.
+                        // TODO: remove this, as the DROP INDEX commands run when the DROP VIEW
+                        // commands are run would remove all rows already.
+                        if (!connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA)) {
+                            Long scn = connection.getSCN();
+                            long ts = (scn == null ? result.getMutationTime() : scn);
+                            String viewIndexSchemaName = MetaDataUtil.getViewIndexSchemaName(schemaName);
+                            String viewIndexTableName = MetaDataUtil.getViewIndexTableName(tableName);
+                            PTable viewIndexTable = new PTableImpl(null, viewIndexSchemaName, viewIndexTableName, ts,
+                                    table.getColumnFamilies());
+                            List<TableRef> tableRefs = Collections.singletonList(new TableRef(null, viewIndexTable, ts, false));
+                            MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null,
+                                    Collections.<PColumn> emptyList(), ts);
+                            connection.getQueryServices().updateData(plan);
+                        }
                     }
                     if (emptyCF != null) {
                         Long scn = connection.getSCN();
@@ -1573,7 +1777,7 @@ public class MetaDataClient {
                                     Collections.<Mutation>singletonList(new Put(SchemaUtil.getTableKey
                                             (tenantIdBytes, tableContainingColumnToDrop.getSchemaName().getBytes(),
                                             tableContainingColumnToDrop.getTableName().getBytes()))),
-                                    tableContainingColumnToDrop.getType(),family);
+                                    family,tableContainingColumnToDrop);
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
index 4cca8c5..a2d8718 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
@@ -46,4 +46,6 @@ public interface PColumn extends PDatum, Writable {
      * @return the declared array size or zero if this is not an array
      */
     Integer getArraySize();
+    
+    byte[] getViewConstant();
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index dc1ab6a..081e37e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -42,6 +42,7 @@ public class PColumnImpl implements PColumn {
     private int position;
     private SortOrder sortOrder;
     private Integer arraySize;
+    private byte[] viewConstant;
 
     public PColumnImpl() {
     }
@@ -53,13 +54,13 @@ public class PColumnImpl implements PColumn {
                        Integer scale,
                        boolean nullable,
                        int position,
-                       SortOrder sortOrder, Integer arrSize) {
-        init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize);
+                       SortOrder sortOrder, Integer arrSize, byte[] viewConstant) {
+        init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant);
     }
 
     public PColumnImpl(PColumn column, int position) {
         this(column.getName(), column.getFamilyName(), column.getDataType(), column.getMaxLength(),
-                column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize());
+                column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant());
     }
 
     private void init(PName name,
@@ -70,7 +71,8 @@ public class PColumnImpl implements PColumn {
             boolean nullable,
             int position,
             SortOrder sortOrder,
-            Integer arrSize) {
+            Integer arrSize,
+            byte[] viewConstant) {
     	Preconditions.checkNotNull(sortOrder);
         this.dataType = dataType;
         if (familyName == null) {
@@ -90,6 +92,7 @@ public class PColumnImpl implements PColumn {
         this.position = position;
         this.sortOrder = sortOrder;
         this.arraySize = arrSize;
+        this.viewConstant = viewConstant;
     }
 
     @Override
@@ -156,10 +159,16 @@ public class PColumnImpl implements PColumn {
         int scale = WritableUtils.readVInt(input);
         boolean nullable = input.readBoolean();
         int position = WritableUtils.readVInt(input);
+        boolean hasViewConstant = (position < 0);
+        position = Math.abs(position)-1;
+        byte[] viewConstant = null;
+        if (hasViewConstant) {
+            viewConstant = Bytes.readByteArray(input);
+        }
         SortOrder sortOrder = SortOrder.fromSystemValue(WritableUtils.readVInt(input));
         int arrSize = WritableUtils.readVInt(input);
         init(columnName, familyName, dataType, maxLength == NO_MAXLENGTH ? null : maxLength,
-                scale == NO_SCALE ? null : scale, nullable, position, sortOrder, arrSize == -1 ? null : arrSize);
+                scale == NO_SCALE ? null : scale, nullable, position, sortOrder, arrSize == -1 ? null : arrSize, viewConstant);
     }
 
     @Override
@@ -170,7 +179,11 @@ public class PColumnImpl implements PColumn {
         WritableUtils.writeVInt(output, maxLength == null ? NO_MAXLENGTH : maxLength);
         WritableUtils.writeVInt(output, scale == null ? NO_SCALE : scale);
         output.writeBoolean(nullable);
-        WritableUtils.writeVInt(output, position);
+        boolean hasViewConstant = (viewConstant != null);
+        WritableUtils.writeVInt(output, (position+1) * (hasViewConstant ? -1 : 1));
+        if (hasViewConstant) {
+            Bytes.writeByteArray(output, viewConstant);
+        }
         WritableUtils.writeVInt(output, sortOrder.getSystemValue());
         WritableUtils.writeVInt(output, arraySize == null ? -1 : arraySize);
     }
@@ -203,4 +216,9 @@ public class PColumnImpl implements PColumn {
     public Integer getArraySize() {
         return arraySize;
     }
+
+    @Override
+    public byte[] getViewConstant() {
+        return viewConstant;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java
index 61f7d68..81f2119 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PDataType.java
@@ -6623,6 +6623,15 @@ public enum PDataType {
     public abstract boolean isFixedWidth();
     public abstract Integer getByteSize();
 
+    /*
+     * We need an empty byte array to mean null, since
+     * we have no other representation in the row key
+     * for null.
+     */
+    public final boolean isNull(byte[] value) {
+        return value == null || value.length == 0;
+    }
+    
     public abstract byte[] toBytes(Object object);
     
     public byte[] toBytes(Object object, SortOrder sortOrder) {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index e20a893..3423873 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -132,7 +132,7 @@ public class PMetaDataImpl implements PMetaData {
         // Update position of columns that follow removed column
         for (int i = position+1; i < oldColumns.size(); i++) {
             PColumn oldColumn = oldColumns.get(i);
-            PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize());
+            PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant());
             columns.add(newColumn);
         }
         

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index da5af1c..d76a116 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -122,7 +122,8 @@ public interface PTable extends Writable {
      */
     PName getName();
     PName getSchemaName(); 
-    PName getTableName(); 
+    PName getTableName();
+    PName getTenantId();
 
     /**
      * @return the table type
@@ -275,4 +276,5 @@ public interface PTable extends Writable {
 
     ViewType getViewType();
     String getViewStatement();
+    Short getViewIndexId();
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index ee95f46..8425809 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -78,6 +78,7 @@ public class PTableImpl implements PTable {
     private PName name;
     private PName schemaName;
     private PName tableName;
+    private PName tenantId;
     private PTableType type;
     private PIndexState state;
     private long sequenceNumber;
@@ -108,15 +109,13 @@ public class PTableImpl implements PTable {
     private boolean disableWAL;
     private boolean multiTenant;
     private ViewType viewType;
+    private Short viewIndexId;
     
     public PTableImpl() {
     }
 
-    public PTableImpl(PName name) { // For finding table ref
-        this.name = name;
-    }
-
-    public PTableImpl(String schemaName, String tableName, long timestamp, List<PColumnFamily> families) { // For base table of mapped VIEW
+    public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families) { // For base table of mapped VIEW
+        this.tenantId = tenantId;
         this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName, tableName));
         this.schemaName = PNameFactory.newName(schemaName);
         this.tableName = PNameFactory.newName(tableName);
@@ -133,6 +132,7 @@ public class PTableImpl implements PTable {
             familyByString.put(family.getName().getString(), family);
         }
         this.families = families;
+        this.physicalNames = Collections.emptyList();;
     }
 
     public PTableImpl(long timeStamp) { // For delete marker
@@ -153,6 +153,7 @@ public class PTableImpl implements PTable {
         this.familyByString = Collections.emptyMap();
         this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
         this.indexes = Collections.emptyList();
+        this.physicalNames = Collections.emptyList();;
     }
 
     // When cloning table, ignore the salt column as it will be added back in the constructor
@@ -162,52 +163,52 @@ public class PTableImpl implements PTable {
     
     public static PTableImpl makePTable(PTable table, long timeStamp, List<PTable> indexes) throws SQLException {
         return new PTableImpl(
-                table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, table.getSequenceNumber() + 1, 
-                table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentTableName(), indexes, table.isImmutableRows(),
-                table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+                table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, 
+                table.getSequenceNumber() + 1, table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentTableName(), indexes,
+                table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId());
     }
 
     public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
         return new PTableImpl(
-                table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), 
-                table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(),
-                table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+                table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), 
+                table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), table.isImmutableRows(),
+                table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
         return new PTableImpl(
-                table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, 
-                table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(),
-                table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+                table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, 
+                sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), table.isImmutableRows(),
+                table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
         return new PTableImpl(
-                table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, 
-                table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), isImmutableRows,
-                table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+                table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, 
+                sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(),
+                isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId());
     }
 
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
         return new PTableImpl(
-                table.getSchemaName(), table.getTableName(), table.getType(), state, table.getTimeStamp(), table.getSequenceNumber(), 
-                table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentTableName(), 
-                table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(),
-                table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+                table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), state, table.getTimeStamp(), 
+                table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), 
+                table.getParentTableName(), table.getIndexes(), table.isImmutableRows(),
+                table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId());
     }
 
-    public static PTableImpl makePTable(PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber, PName pkName,
-            Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames,
-            PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType) throws SQLException {
-        return new PTableImpl(schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataTableName, indexes,
-                isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType);
+    public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
+            PName pkName, Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
+            List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType, Short viewIndexId) throws SQLException {
+        return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataTableName,
+                indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId);
     }
 
-    private PTableImpl(PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber, PName pkName,
-            Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames,
-            PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType) throws SQLException {
-        init(schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, new PTableStatsImpl(),
-                dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType);
+    private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
+            PName pkName, Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
+            List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType, Short viewIndexId) throws SQLException {
+        init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
+                new PTableStatsImpl(), dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId);
     }
 
     @Override
@@ -220,9 +221,9 @@ public class PTableImpl implements PTable {
         return viewType;
     }
     
-    private void init(PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber, PName pkName,
-            Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
-            List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType) throws SQLException {
+    private void init(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
+            PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentTableName, List<PTable> indexes,
+            boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType, Short viewIndexId) throws SQLException {
         if (schemaName == null) {
             throw new NullPointerException();
         }
@@ -240,6 +241,7 @@ public class PTableImpl implements PTable {
         this.disableWAL = disableWAL;
         this.multiTenant = multiTenant;
         this.viewType = viewType;
+        this.viewIndexId = viewIndexId;
         List<PColumn> pkColumns;
         PColumn[] allColumns;
 
@@ -567,7 +569,7 @@ public class PTableImpl implements PTable {
             byte[] qualifier = column.getName().getBytes();
             PDataType type = column.getDataType();
             // Check null, since some types have no byte representation for null
-            if (byteValue == null || byteValue.length == 0) {
+            if (type.isNull(byteValue)) {
                 if (!column.isNullable()) { 
                     throw new ConstraintViolationException(name.getString() + "." + column.getName().getString() + " may not be null");
                 }
@@ -643,15 +645,22 @@ public class PTableImpl implements PTable {
 
     @Override
     public void readFields(DataInput input) throws IOException {
+        byte[] tenantIdBytes = Bytes.readByteArray(input);
         byte[] schemaNameBytes = Bytes.readByteArray(input);
         byte[] tableNameBytes = Bytes.readByteArray(input);
+        PName tenantId = tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes);
         PName schemaName = PNameFactory.newName(schemaNameBytes);
         PName tableName = PNameFactory.newName(tableNameBytes);
         PTableType tableType = PTableType.values()[WritableUtils.readVInt(input)];
         PIndexState indexState = null;
+        Short viewIndexId = null;
         if (tableType == PTableType.INDEX) {
             int ordinal = WritableUtils.readVInt(input);
-            if (ordinal >= 0) {
+            if (ordinal < 0) {
+                viewIndexId = input.readShort();
+            }
+            ordinal = Math.abs(ordinal) - 1;
+            if (ordinal < PIndexState.values().length) {
                 indexState = PIndexState.values()[ordinal];
             }
         }
@@ -699,6 +708,8 @@ public class PTableImpl implements PTable {
             viewType = ViewType.fromSerializedValue(input.readByte());
             byte[] viewStatementBytes = Bytes.readByteArray(input);
             viewStatement = viewStatementBytes.length == 0 ? null : (String)PDataType.VARCHAR.toObject(viewStatementBytes);
+        }
+        if (tableType == PTableType.VIEW || viewIndexId != null) {
             int nPhysicalNames = WritableUtils.readVInt(input);
             physicalNames = Lists.newArrayListWithExpectedSize(nPhysicalNames);
             for (int i = 0; i < nPhysicalNames; i++) {
@@ -708,10 +719,10 @@ public class PTableImpl implements PTable {
         }
         PTableStats stats = new PTableStatsImpl(guidePosts);
         try {
-            init(schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
-                 bucketNum.equals(NO_SALTING) ? null : bucketNum, columns, stats, dataTableName,
-                 indexes, isImmutableRows, physicalNames, defaultFamilyName,
-                 viewStatement, disableWAL, multiTenant, viewType);
+            init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber,
+                 pkName, bucketNum.equals(NO_SALTING) ? null : bucketNum, columns, stats,
+                 dataTableName, indexes, isImmutableRows, physicalNames,
+                 defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, viewIndexId);
         } catch (SQLException e) {
             throw new RuntimeException(e); // Impossible
         }
@@ -719,11 +730,15 @@ public class PTableImpl implements PTable {
 
     @Override
     public void write(DataOutput output) throws IOException {
+        Bytes.writeByteArray(output, tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes());
         Bytes.writeByteArray(output, schemaName.getBytes());
         Bytes.writeByteArray(output, tableName.getBytes());
         WritableUtils.writeVInt(output, type.ordinal());
         if (type == PTableType.INDEX) {
-            WritableUtils.writeVInt(output, state == null ? -1 : state.ordinal());
+            WritableUtils.writeVInt(output, (((state == null ? PIndexState.values().length : state.ordinal()) + 1) * (viewIndexId == null ? 1 : -1)));
+            if (viewIndexId != null) {
+                output.writeShort(viewIndexId);
+            }
         }
         WritableUtils.writeVLong(output, sequenceNumber);
         output.writeLong(timeStamp);
@@ -754,6 +769,8 @@ public class PTableImpl implements PTable {
         if (type == PTableType.VIEW) {
             output.writeByte(viewType.getSerializedValue());
             Bytes.writeByteArray(output, viewStatement == null ? ByteUtil.EMPTY_BYTE_ARRAY : PDataType.VARCHAR.toBytes(viewStatement));
+        }
+        if (type == PTableType.VIEW || viewIndexId != null) {
             WritableUtils.writeVInt(output, physicalNames.size());
             for (int i = 0; i < physicalNames.size(); i++) {
                 Bytes.writeByteArray(output, physicalNames.get(i).getBytes());
@@ -860,4 +877,14 @@ public class PTableImpl implements PTable {
     public boolean isWALDisabled() {
         return disableWAL;
     }
+
+    @Override
+    public Short getViewIndexId() {
+        return viewIndexId;
+    }
+
+    @Override
+    public PName getTenantId() {
+        return tenantId;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
index 74cbaee..4cf9ff1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
@@ -36,7 +36,7 @@ public class SaltingUtil {
     public static final String SALTING_COLUMN_NAME = "_SALT";
     public static final String SALTED_ROW_KEY_NAME = "_SALTED_KEY";
     public static final PColumnImpl SALTING_COLUMN = new PColumnImpl(
-            PNameFactory.newName(SALTING_COLUMN_NAME), null, PDataType.BINARY, 1, 0, false, 0, SortOrder.getDefault(), 0);
+            PNameFactory.newName(SALTING_COLUMN_NAME), null, PDataType.BINARY, 1, 0, false, 0, SortOrder.getDefault(), 0, null);
     public static final RowKeySchema VAR_BINARY_SALTED_SCHEMA = new RowKeySchemaBuilder(1)
         .addField(SALTING_COLUMN, false, SortOrder.getDefault())
         .addField(SchemaUtil.VAR_BINARY_DATUM, false, SortOrder.getDefault()).build();

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
index 48ff40a..466b4a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
@@ -37,7 +37,11 @@ public class TableAlreadyExistsException extends SQLException {
     private final String tableName;
 
     public TableAlreadyExistsException(String schemaName, String tableName) {
-        super(new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName).build().toString(),
+        this(schemaName, tableName, null);
+    }
+
+    public TableAlreadyExistsException(String schemaName, String tableName, String msg) {
+        super(new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName).setMessage(msg).build().toString(),
                 code.getSQLState(), code.getErrorCode());
         this.schemaName = schemaName;
         this.tableName = tableName;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/e2bd0ee0/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index 09684cb..9ae2558 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -19,23 +19,34 @@ package org.apache.phoenix.util;
 
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
 
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
 
 
 public class MetaDataUtil {
-
+    public static final String VIEW_INDEX_TABLE_PREFIX = "_IDX_";
+    public static final byte[] VIEW_INDEX_TABLE_PREFIX_BYTES = Bytes.toBytes(VIEW_INDEX_TABLE_PREFIX);
+    public static final String VIEW_INDEX_SEQUENCE_PREFIX = "_SEQ_";
+    public static final byte[] VIEW_INDEX_SEQUENCE_PREFIX_BYTES = Bytes.toBytes(VIEW_INDEX_SEQUENCE_PREFIX);
+    public static final String VIEW_INDEX_ID_COLUMN_NAME = "_INDEX_ID";
+    
     public static boolean areClientAndServerCompatible(long version) {
         // A server and client with the same major and minor version number must be compatible.
         // So it's important that we roll the PHOENIX_MAJOR_VERSION or PHOENIX_MINOR_VERSION
@@ -184,7 +195,7 @@ public class MetaDataUtil {
         return kv == null ? ByteUtil.EMPTY_BYTE_ARRAY : kv.getValue();
     }
 
-    private static KeyValue getMutationKeyValue(Mutation headerRow, byte[] key) {
+    public static KeyValue getMutationKeyValue(Mutation headerRow, byte[] key) {
         List<KeyValue> kvs = headerRow.getFamilyMap().get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES);
         if (kvs != null) {
             for (KeyValue kv : kvs) {
@@ -229,4 +240,63 @@ public class MetaDataUtil {
     public static byte[] getParentLinkKey(byte[] tenantId, byte[] schemaName, byte[] tableName, byte[] indexName) {
         return ByteUtil.concat(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId, QueryConstants.SEPARATOR_BYTE_ARRAY, schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : schemaName, QueryConstants.SEPARATOR_BYTE_ARRAY, tableName, QueryConstants.SEPARATOR_BYTE_ARRAY, QueryConstants.SEPARATOR_BYTE_ARRAY, indexName);
     }
+    
+    public static boolean isMultiTenant(Mutation m) {
+        return Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(MetaDataUtil.getMutationKVByteValue(m, PhoenixDatabaseMetaData.MULTI_TENANT_BYTES)));
+    }
+    
+    public static boolean isSalted(Mutation m) {
+        return MetaDataUtil.getMutationKeyValue(m, PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES) != null;
+    }
+    
+    public static byte[] getViewIndexPhysicalName(byte[] physicalTableName) {
+        return ByteUtil.concat(VIEW_INDEX_TABLE_PREFIX_BYTES, physicalTableName);
+    }
+
+    public static String getViewIndexTableName(String tableName) {
+        return VIEW_INDEX_TABLE_PREFIX + tableName;
+    }
+
+    public static String getViewIndexSchemaName(String schemaName) {
+        return schemaName;
+    }
+
+    public static SequenceKey getViewIndexSequenceKey(String tenantId, PName physicalName) {
+        // Create global sequence of the form: <prefixed base table name><tenant id>
+        // rather than tenant-specific sequence, as it makes it much easier
+        // to cleanup when the physical table is dropped, as we can delete
+        // all global sequences leading with <prefix> + physical name.
+        String schemaName = VIEW_INDEX_SEQUENCE_PREFIX + physicalName.getString();
+        String tableName = tenantId == null ? "" : tenantId;
+        return new SequenceKey(null, schemaName, tableName);
+    }
+
+    public static PDataType getViewIndexIdDataType() {
+        return PDataType.SMALLINT;
+    }
+
+    public static String getViewIndexIdColumnName() {
+        return VIEW_INDEX_ID_COLUMN_NAME;
+    }
+
+    public static boolean hasViewIndexTable(PhoenixConnection connection, PName name) throws SQLException {
+        byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(name.getBytes());
+        try {
+            HTableDescriptor desc = connection.getQueryServices().getTableDescriptor(physicalIndexName);
+            return desc != null && Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(desc.getValue(IS_VIEW_INDEX_TABLE_PROP_BYTES)));
+        } catch (TableNotFoundException e) {
+            return false;
+        }
+    }
+    
+    public static void deleteViewIndexSequences(PhoenixConnection connection, PName name) throws SQLException {
+        SequenceKey key = getViewIndexSequenceKey(null, name);
+        connection.createStatement().executeUpdate("DELETE FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME + 
+                " WHERE " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL AND " + 
+                PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" + key.getSchemaName() + "'");
+        
+    }
+
+    public static final String IS_VIEW_INDEX_TABLE_PROP_NAME = "IS_VIEW_INDEX_TABLE";
+    public static final byte[] IS_VIEW_INDEX_TABLE_PROP_BYTES = Bytes.toBytes(IS_VIEW_INDEX_TABLE_PROP_NAME);
 }