You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/08/23 19:15:33 UTC

[08/40] phoenix git commit: PHOENIX-2110 Primary key changes should be pushed to diverged views

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5b46793b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 05e7acb..4c17bf1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
@@ -188,6 +190,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
@@ -1679,308 +1682,526 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
     
-    private MetaDataMutationResult addRowsToChildViews(PTable table, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
+    private final class PutWithOrdinalPosition implements Comparable<PutWithOrdinalPosition>{
+        private final Put put;
+        private final int ordinalPosition;
+        
+        public PutWithOrdinalPosition(Put put, int ordinalPos) {
+            this.put = put;
+            this.ordinalPosition = ordinalPos;
+        }
+
+        @Override
+        public int compareTo(PutWithOrdinalPosition o) {
+            return (this.ordinalPosition < o.ordinalPosition ? -1 : this.ordinalPosition > o.ordinalPosition ? 1 : 0);
+        }
+    }
+    
+    private static int getOrdinalPosition(PTable table, PColumn col) {
+        return table.getBucketNum() == null ? col.getPosition() + 1 : col.getPosition();
+    }
+    
+    private static boolean isDivergedView(PTable view) {
+        return view.getBaseColumnCount() == QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
+    }
+    
+    /**
+     * 
+     * Class to keep track of columns and their ordinal positions as we
+     * process through the list of columns to be added.
+     *
+     */
+    private static class ColumnOrdinalPositionUpdateList {
+        final List<byte[]> columnKeys = new ArrayList<>(10);
+        int offset;
+        
+        int size()  {
+            return columnKeys.size();
+        }
+        
+        private void setOffset(int lowestOrdinalPos) {
+            this.offset = lowestOrdinalPos;
+        }
+
+        private void addColumn(byte[] columnKey, int position) {
+            checkArgument(position >= this.offset);
+            int index = position - offset;
+            int size = columnKeys.size();
+            checkState(index <= size);
+            if (size == 0) {
+                columnKeys.add(columnKey);
+                return;
+            }
+            int stopIndex = size;
+            // Check if an entry for this column key is already there.
+            for (int i = 0; i < size; i++) {
+                if (Bytes.equals(columnKeys.get(i), columnKey)) {
+                    stopIndex = i;
+                    break;
+                }
+            }
+            if (stopIndex == size) {
+                /*
+                 * The column key is not present in the list. So add it at the specified index
+                 * and right shift the elements at this index and beyond.
+                 */
+                columnKeys.add(index, columnKey);
+            } else {
+                /*
+                 * The column key is already present in the list.
+                 * Move the elements of the list to the left up to the stop index
+                 */
+                for (int i = stopIndex; i > index; i--) {
+                    columnKeys.set(i, columnKeys.get(i - 1));
+                }
+                columnKeys.set(index, columnKey);
+            }
+        }
+        
+        private int getOrdinalPositionFromListIdx(int listIndex) {
+            checkArgument(listIndex < columnKeys.size());
+            return listIndex + offset;
+        }
+        
+        /**
+         * @param columnKey
+         * @return if present - the ordinal position of the column in this list.
+         * If not present - -1.
+         */
+        private int getOrdinalPositionOfColumn(byte[] columnKey) {
+            int i = 0;
+            for (byte[] key : columnKeys) {
+                if (Bytes.equals(key, columnKey)) {
+                    return i + offset;
+                }
+                i++;
+            }
+            return -1;
+        }
+    }
+    
+    private static byte[] getColumnKey(byte[] viewKey, PColumn column) {
+        return getColumnKey(viewKey, column.getName().getString(), column.getFamilyName() != null ? column.getFamilyName().getString() : null);
+    }
+    
+    private static byte[] getColumnKey(byte[] viewKey, String columnName, String columnFamily) {
+        byte[] columnKey = ByteUtil.concat(viewKey, QueryConstants.SEPARATOR_BYTE_ARRAY,
+                Bytes.toBytes(columnName));
+        if (columnFamily != null) {
+            columnKey = ByteUtil.concat(columnKey, QueryConstants.SEPARATOR_BYTE_ARRAY,
+                    Bytes.toBytes(columnFamily));
+        }
+        return columnKey;
+    }
+    
+    MetaDataMutationResult addRowsToChildViews(PTable basePhysicalTable, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
             List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, TableViewFinderResult childViewsResult,
             Region region, List<RowLock> locks) throws IOException, SQLException {
+        List<PutWithOrdinalPosition> columnPutsForBaseTable = new ArrayList<>(tableMetadata.size());
+        // Isolate the puts relevant to adding columns. Also figure out what kind of columns are being added.
+        for (Mutation m : tableMetadata) {
+            if (m instanceof Put) {
+                byte[][] rkmd = new byte[5][];
+                int pkCount = getVarChars(m.getRow(), rkmd);
+                if (m instanceof Put && pkCount > COLUMN_NAME_INDEX
+                        && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
+                        && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
+                    columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES)));
+                }
+            }
+        }
+        // Sort the puts by ordinal position 
+        Collections.sort(columnPutsForBaseTable);
+        assert columnPutsForBaseTable.size() > 0;
         for (Result viewResult : childViewsResult.getResults()) {
+            short deltaNumPkColsSoFar = 0;
+            short columnsAddedToView = 0;
+            short columnsAddedToBaseTable = 0;
             byte[][] rowViewKeyMetaData = new byte[3][];
             getVarChars(viewResult.getRow(), 3, rowViewKeyMetaData);
-            byte[] viewTenantId = rowViewKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
-            byte[] viewSchemaName = rowViewKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
-            byte[] viewName = rowViewKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
-            byte[] viewKey = SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName);
+            byte[] viewKey = SchemaUtil.getTableKey(rowViewKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX],
+                    rowViewKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX],
+                    rowViewKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
+            
             // lock the rows corresponding to views so that no other thread can modify the view meta-data
             RowLock viewRowLock = acquireLock(region, viewKey, locks);
             PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock);
-            if (view.getBaseColumnCount() == QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT) {
-                // if a view has diverged from the base table, we don't allow schema changes
-                // to be propagated to it.
-                // FIXME: We should allow PK changes to be propagated to a diverged view See PHOENIX-2110
-                continue;
-            }
-            int numColsAddedToBaseTable = 0;
-            int numColsAddedToView = 0;
-            short deltaNumPkColsSoFar = 0;
-            PColumn existingViewColumn = null;
-            List<PColumn> viewPkColumns = Lists.newArrayList(view.getPKColumns());
-            boolean addingPKColumn=false;
-            for (Mutation m : tableMetadata) {
+            
+            ColumnOrdinalPositionUpdateList ordinalPositionList = new ColumnOrdinalPositionUpdateList();
+            List<PColumn> viewPkCols = new ArrayList<>(view.getPKColumns());
+            boolean addingExistingPkCol = false;
+            int numCols = view.getColumns().size();
+            for (PutWithOrdinalPosition p : columnPutsForBaseTable) {
+                Put baseTableColumnPut = p.put;
+                PColumn existingViewColumn = null;
                 byte[][] rkmd = new byte[5][];
-                int pkCount = getVarChars(m.getRow(), rkmd);
-                if (m instanceof Put && pkCount > COLUMN_NAME_INDEX
-                        && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
-                        && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
-                	try {
-                        // Maybe deserving of a new SchemaUtil.getColumnByName(byte[] familyName, String columnName) function 
-                        String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
-                        existingViewColumn = rkmd[FAMILY_NAME_INDEX] == null 
-                            ? view.getPKColumn(columnName) 
-                            : view.getColumnFamily(rkmd[FAMILY_NAME_INDEX]).getColumn(columnName);
-                    } 
-                	catch (ColumnFamilyNotFoundException e) {
+                getVarChars(baseTableColumnPut.getRow(), rkmd);
+                String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
+                String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]);
+                try {
+                    existingViewColumn = columnFamily == null ? view.getColumn(columnName) : view.getColumnFamily(
+                            columnFamily).getColumn(columnName);
+                } catch (ColumnFamilyNotFoundException e) {
+                    // ignore since it means that the column family is not present for the column to be added.
+                } catch (ColumnNotFoundException e) {
+                    // ignore since it means the column is not present in the view
+                }
+                
+                boolean isPkCol = columnFamily == null;
+                byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily);
+                if (existingViewColumn != null) {
+                    MetaDataMutationResult result = validateColumnForAddToBaseTable(existingViewColumn, baseTableColumnPut, basePhysicalTable, isPkCol, view);
+                    if (result != null) {
+                        return result;
                     }
-                	catch (ColumnNotFoundException e) {
-                    } // Ignore - means column family or column name don't exist
-                	
-                    Put p = (Put)m;
-                    byte[] columnKey = ByteUtil.concat(viewKey, QueryConstants.SEPARATOR_BYTE_ARRAY, rkmd[COLUMN_NAME_INDEX]);
-                    if (rkmd[FAMILY_NAME_INDEX] != null) {
-                        columnKey = ByteUtil.concat(columnKey, QueryConstants.SEPARATOR_BYTE_ARRAY, rkmd[FAMILY_NAME_INDEX]);
+                    if (isPkCol) {
+                        viewPkCols.remove(existingViewColumn);
+                        addingExistingPkCol = true;
                     }
-                    Put viewColumnDefinitionPut = new Put(columnKey, clientTimeStamp);
-                    for (Cell cell : p.getFamilyCellMap().values().iterator().next()) {
-                        viewColumnDefinitionPut.add(CellUtil.createCell(columnKey, CellUtil.cloneFamily(cell),
+                    /*
+                     * For views that are not diverged, we need to make sure that the existing columns
+                     * have the same ordinal position as in the base table. This is important because
+                     * we rely on the ordinal position of the column to figure out whether dropping a 
+                     * column from the view will end up diverging the view from the base table.
+                     * 
+                     * For already diverged views, we don't care about the ordinal position of the existing column.
+                     */
+                    if (!isDivergedView(view)) {
+                        int newOrdinalPosition = p.ordinalPosition;
+                        // Check if the ordinal position of the column was getting updated from previous add column
+                        // mutations.
+                        int existingOrdinalPos = ordinalPositionList.getOrdinalPositionOfColumn(columnKey);
+                        if (ordinalPositionList.size() == 0) {
+                            /*
+                             * No ordinal positions to be updated are in the list. In that case, check whether the
+                             * existing ordinal position of the column is different from its new ordinal position.
+                             * If yes, then initialize the ordinal position list with this column's ordinal position
+                             * as the offset.
+                             */
+                            existingOrdinalPos = getOrdinalPosition(view, existingViewColumn);
+                            if (existingOrdinalPos != newOrdinalPosition) {
+                                ordinalPositionList.setOffset(newOrdinalPosition);
+                                ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
+                                for (PColumn col : view.getColumns()) {
+                                    int ordinalPos = getOrdinalPosition(view, col);
+                                    if (ordinalPos >= newOrdinalPosition) {
+                                        if (ordinalPos == existingOrdinalPos) {
+                                            /*
+                                             * No need to update ordinal positions of columns beyond the existing column's 
+                                             * old ordinal position.
+                                             */
+                                            break;
+                                        }
+                                        // increment ordinal position of columns occurring after this column by 1
+                                        int updatedPos = ordinalPos + 1;
+                                        ordinalPositionList.addColumn(getColumnKey(viewKey, col), updatedPos);
+                                    } 
+                                }
+                            } 
+                        } else {
+                            if (existingOrdinalPos != newOrdinalPosition) {
+                                ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
+                            }
+                        }
+                        columnsAddedToBaseTable++;
+                    }
+                } else {
+                    // The column doesn't exist in the view.
+                    Put viewColumnPut = new Put(columnKey, clientTimeStamp);
+                    for (Cell cell : baseTableColumnPut.getFamilyCellMap().values().iterator().next()) {
+                        viewColumnPut.add(CellUtil.createCell(columnKey, CellUtil.cloneFamily(cell),
                                 CellUtil.cloneQualifier(cell), cell.getTimestamp(), cell.getTypeByte(),
                                 CellUtil.cloneValue(cell)));
                     }
-                    
-                    // if there is already a view column with the same name as the base table column we are trying to add
-                	if (existingViewColumn != null) {
-                		List<Cell> dataTypes = viewColumnDefinitionPut
-                                .get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.DATA_TYPE_BYTES);
-                		if (dataTypes != null && dataTypes.size() > 0) {
-                            Cell cell = dataTypes.get(0);
-                            int typeId = PInteger.INSTANCE.getCodec().decodeInt(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
-                    		PDataType dataType = PDataType.fromTypeId(typeId);
-                    		if (!existingViewColumn.getDataType().equals(dataType)) {
-                    			 return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
-                    		}
-                		}
-                		// if adding a pk column to the base table
-                		if (rkmd[FAMILY_NAME_INDEX] == null && rkmd[COLUMN_NAME_INDEX] != null) {
-	                        List<Cell> keySeqCells = viewColumnDefinitionPut.get(
-	                                PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-	                                PhoenixDatabaseMetaData.KEY_SEQ_BYTES);
-	                        if (keySeqCells != null && keySeqCells.size() > 0) {
-	                        	Cell cell = keySeqCells.get(0);
-	                        	int keySeq = PSmallint.INSTANCE.getCodec().decodeInt(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
-	                     		int pkPosition = SchemaUtil.getPKPosition(view, existingViewColumn)+1;
-	                     		if (pkPosition!=keySeq) {
-	                     			return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
-	                     		}
-	                        }
-	                        List<Cell> sortOrders = viewColumnDefinitionPut.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                    PhoenixDatabaseMetaData.SORT_ORDER_BYTES);
-	                        SortOrder sortOrder = SortOrder.getDefault();
-                            if (sortOrders != null && sortOrders.size() > 0) {
-                            	Cell cell = sortOrders.get(0);
-                            	int sortOrderInt = PInteger.INSTANCE.getCodec().decodeInt(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
-                            	sortOrder = SortOrder.fromSystemValue(sortOrderInt);                            	
-                            }
-                            if (!Objects.equal(sortOrder,existingViewColumn.getSortOrder())) {
-                        		return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
-                        	}
-	                        viewPkColumns.remove(existingViewColumn);
-                        }
-                		// if there is an existing view column that matches the column being added to the base table and if the column being added has a null
-                    	// scale or maxLength, we need to explicitly do a put to set the scale or maxLength to null (in case the view column has the scale or 
-                    	// max length set)
-                		Integer maxLength = null;
-                		List<Cell> columnSizes = viewColumnDefinitionPut.get(
-                                PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES);
-                        if (columnSizes != null && columnSizes.size() > 0) {
-                            Cell cell = columnSizes.get(0);
-                            maxLength = cell.getValueArray()==null ? null :
-                    		PInteger.INSTANCE.getCodec().decodeInt(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
-                        }
-                        if (!Objects.equal(maxLength,existingViewColumn.getMaxLength())) {
-                        	return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null); 
-                        }
-                        Integer scale = null;
-                        List<Cell> decimalDigits = viewColumnDefinitionPut.get(
-                                PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES);
-                        if (decimalDigits != null && decimalDigits.size() > 0) {
-                            Cell cell = decimalDigits.get(0);
-                            scale = cell.getValueArray()==null ? null :
-                        	PInteger.INSTANCE.getCodec().decodeInt(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
-                        }
-                        if (!Objects.equal(scale,existingViewColumn.getScale())) {	
-                        	return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null); 
+                    if (isDivergedView(view)) {
+                        if (isPkCol) {
+                            /* 
+                             * Only pk cols of the base table are added to the diverged views. These pk 
+                             * cols are added at the end.
+                             */
+                            int lastOrdinalPos = getOrdinalPosition(view, view.getColumns().get(numCols - 1));
+                            int newPosition = ++lastOrdinalPos;
+                            byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()];
+                            PInteger.INSTANCE.getCodec().encodeInt(newPosition, ptr, 0);
+                            viewColumnPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                                    PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr);
+                            mutationsForAddingColumnsToViews.add(viewColumnPut);
+                        } else {
+                            continue; // move on to the next column
                         }
-                	}
-                	else {
-                    	// if we are adding a column that already exists in the view, no need to update the base table or view table column count
-                    	numColsAddedToView++;
-                    } 
-                    	
-                	
-                    numColsAddedToBaseTable++;
-                    mutationsForAddingColumnsToViews.add(viewColumnDefinitionPut);
-                    if (rkmd[FAMILY_NAME_INDEX] == null && rkmd[COLUMN_NAME_INDEX] != null) {
+                    } else {
+                        int newOrdinalPosition = p.ordinalPosition;
                         /*
-                         * If adding a pk column to the base table (and hence the view), see if there are any indexes on
-                         * the view. If yes, then generate puts for the index header row and column rows.
+                         * For a non-diverged view, we need to make sure that the base table column
+                         * is added at the right position.
                          */
-                    	addingPKColumn=true;
-                        deltaNumPkColsSoFar++;
-                        for (PTable index : view.getIndexes()) {
-                            int oldNumberOfColsInIndex = index.getColumns().size();
-                            
-                            byte[] indexColumnKey = ByteUtil.concat(getViewIndexHeaderRowKey(index),
-                                    QueryConstants.SEPARATOR_BYTE_ARRAY,
-                                    IndexUtil.getIndexColumnName(rkmd[FAMILY_NAME_INDEX], rkmd[COLUMN_NAME_INDEX]));
-                            Put indexColumnDefinitionPut = new Put(indexColumnKey, clientTimeStamp);
-                            
-                            // Set the index specific data type for the column
-                            List<Cell> dataTypes = viewColumnDefinitionPut
-                                    .get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                            PhoenixDatabaseMetaData.DATA_TYPE_BYTES);
-                            if (dataTypes != null && dataTypes.size() > 0) {
-                                Cell dataType = dataTypes.get(0);
-                                int dataColumnDataType = PInteger.INSTANCE.getCodec().decodeInt(
-                                        dataType.getValueArray(), dataType.getValueOffset(), SortOrder.ASC);
-                                int indexColumnDataType = IndexUtil.getIndexColumnDataType(true,
-                                        PDataType.fromTypeId(dataColumnDataType)).getSqlType();
-                                byte[] indexColumnDataTypeBytes = new byte[PInteger.INSTANCE.getByteSize()];
-                                PInteger.INSTANCE.getCodec().encodeInt(indexColumnDataType, indexColumnDataTypeBytes, 0);
-                                indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.DATA_TYPE_BYTES, indexColumnDataTypeBytes);
-                            }
-                            
-                            // Set precision
-                            List<Cell> decimalDigits = viewColumnDefinitionPut.get(
-                                    PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                    PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES);
-                            if (decimalDigits != null && decimalDigits.size() > 0) {
-                                Cell decimalDigit = decimalDigits.get(0);
-                                indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES, decimalDigit.getValueArray());
+                        if (ordinalPositionList.size() == 0) {
+                            ordinalPositionList.setOffset(newOrdinalPosition);
+                            ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
+                            for (PColumn col : view.getColumns()) {
+                                int ordinalPos = getOrdinalPosition(view, col);
+                                if (ordinalPos >= newOrdinalPosition) {
+                                    // increment ordinal position of columns by 1
+                                    int updatedPos = ordinalPos + 1;
+                                    ordinalPositionList.addColumn(getColumnKey(viewKey, col), updatedPos);
+                                } 
                             }
-                            
-                            // Set size
-                            List<Cell> columnSizes = viewColumnDefinitionPut.get(
-                                    PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                    PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES);
-                            if (columnSizes != null && columnSizes.size() > 0) {
-                                Cell columnSize = columnSizes.get(0);
-                                indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES, columnSize.getValueArray());
-                            }
-                            
-                            // Set sort order
-                            List<Cell> sortOrders = viewColumnDefinitionPut.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                    PhoenixDatabaseMetaData.SORT_ORDER_BYTES);
-                            if (sortOrders != null && sortOrders.size() > 0) {
-                                Cell sortOrder = sortOrders.get(0);
-                                indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.SORT_ORDER_BYTES, sortOrder.getValueArray());
-                            }
-                            
-                            // Set data table name
-                            List<Cell> dataTableNames = viewColumnDefinitionPut.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                    PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
-                            if (dataTableNames != null && dataTableNames.size() > 0) {
-                                Cell dataTableName = dataTableNames.get(0);
-                                indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES, dataTableName.getValueArray());
-                            }
-                            
-                            // Set the ordinal position of the new column.
-                            byte[] ordinalPositionBytes = new byte[PInteger.INSTANCE.getByteSize()];
-                            int ordinalPositionOfNewCol = oldNumberOfColsInIndex + deltaNumPkColsSoFar;
-                            PInteger.INSTANCE.getCodec().encodeInt(ordinalPositionOfNewCol, ordinalPositionBytes, 0);
-                            indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, ordinalPositionBytes);
-                            
-                            // New PK columns have to be nullable after the first DDL
-                            byte[] isNullableBytes = PBoolean.INSTANCE.toBytes(true);
-                            indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.NULLABLE_BYTES, isNullableBytes);
-                            
-                            // Set the key sequence for the pk column to be added
-                            short currentKeySeq = SchemaUtil.getMaxKeySeq(index);
-                            short newKeySeq = (short)(currentKeySeq + deltaNumPkColsSoFar);
-                            byte[] keySeqBytes = new byte[PSmallint.INSTANCE.getByteSize()];
-                            PSmallint.INSTANCE.getCodec().encodeShort(newKeySeq, keySeqBytes, 0);
-                            indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                                    PhoenixDatabaseMetaData.KEY_SEQ_BYTES, keySeqBytes);
-                            
-                            mutationsForAddingColumnsToViews.add(indexColumnDefinitionPut);
+                        } else {
+                            ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
                         }
+                        mutationsForAddingColumnsToViews.add(viewColumnPut);
                     }
+                    if (isPkCol) {
+                        deltaNumPkColsSoFar++;
+                        // Set the key sequence for the pk column to be added
+                        short currentKeySeq = SchemaUtil.getMaxKeySeq(view);
+                        short newKeySeq = (short)(currentKeySeq + deltaNumPkColsSoFar);
+                        byte[] keySeqBytes = new byte[PSmallint.INSTANCE.getByteSize()];
+                        PSmallint.INSTANCE.getCodec().encodeShort(newKeySeq, keySeqBytes, 0);
+                        viewColumnPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                                PhoenixDatabaseMetaData.KEY_SEQ_BYTES, keySeqBytes);
+                        addMutationsForAddingPkColsToViewIndexes(mutationsForAddingColumnsToViews, clientTimeStamp, view,
+                                deltaNumPkColsSoFar, columnName, viewColumnPut);
+                    }
+                    columnsAddedToView++;
+                    columnsAddedToBaseTable++;
                 }
             }
-            // allow adding a pk columns to base table :
-            // 1. if all the view pk columns are exactly the same as the base table pk columns
-            // 2. if we are adding all the existing view pk columns to the base table 
-            if (addingPKColumn && !viewPkColumns.equals(table.getPKColumns())) {
-            	return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+            /*
+             * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base
+             * table pk columns 2. if we are adding all the existing view pk columns to the base table
+             */ 
+            if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) {
+                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
             }
-            if (deltaNumPkColsSoFar > 0) {
-                for (PTable index : view.getIndexes()) {
-                    byte[] indexHeaderRowKey = getViewIndexHeaderRowKey(index);
-                    Put indexHeaderRowMutation = new Put(indexHeaderRowKey);
-                    
-                    // increment sequence number
-                    long newSequenceNumber = index.getSequenceNumber() + 1;
-                    byte[] newSequenceNumberPtr = new byte[PLong.INSTANCE.getByteSize()];
-                    PLong.INSTANCE.getCodec().encodeLong(newSequenceNumber, newSequenceNumberPtr, 0);
-                    indexHeaderRowMutation.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                            PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, newSequenceNumberPtr);
-                    
-                    // increase the column count
-                    int newColumnCount = index.getColumns().size() + deltaNumPkColsSoFar;
-                    byte[] newColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
-                    PInteger.INSTANCE.getCodec().encodeInt(newColumnCount, newColumnCountPtr, 0);
-                    indexHeaderRowMutation.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                            PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, newColumnCountPtr);
-                    
-                    // add index row header key to the invalidate list to force clients to fetch the latest meta-data
-                    invalidateList.add(new ImmutableBytesPtr(indexHeaderRowKey));
-                    if (index.rowKeyOrderOptimizable()) {
-                        UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, indexHeaderRowKey, clientTimeStamp);
-                    }
-                    mutationsForAddingColumnsToViews.add(indexHeaderRowMutation);
+            addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view,
+                    deltaNumPkColsSoFar);
+            
+            // Update the view header rows with new column counts.
+            Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp);
+            if (!isDivergedView(view) && columnsAddedToBaseTable > 0) {
+                // Base column count should only be updated for diverged views.
+                int oldBaseColumnCount = view.getBaseColumnCount();
+                byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
+                PInteger.INSTANCE.getCodec().encodeInt(oldBaseColumnCount + columnsAddedToBaseTable, baseColumnCountPtr, 0);
+                viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp, baseColumnCountPtr);
+            }
+            
+            if (columnsAddedToView > 0) {
+                byte[] columnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
+                PInteger.INSTANCE.getCodec().encodeInt(numCols + columnsAddedToView, columnCountPtr, 0);
+                viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, clientTimeStamp, columnCountPtr);
+            }
+            
+            /*
+             * Increment the sequence number by 1 if:
+             * 1) For a diverged view, there were columns (pk columns) added to the view.
+             * 2) For a non-diverged view if the base column count changed.
+             *    
+             */
+            boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0)
+                    || (!isDivergedView(view) && columnsAddedToBaseTable > 0);
+            if (changeSequenceNumber) {
+                byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()];
+                PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0);
+                viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr);
+
+                mutationsForAddingColumnsToViews.add(viewHeaderRowPut);
+
+                // only invalidate if the sequence number is about to change
+                invalidateList.add(new ImmutableBytesPtr(viewKey));
+
+                // Update the ordinal positions. The list would be non-empty only if the sequence
+                // number will change.
+                int i = 0;
+                for (byte[] columnKey : ordinalPositionList.columnKeys) {
+                    int ordinalPosition = ordinalPositionList.getOrdinalPositionFromListIdx(i);
+                    Put positionUpdatePut = new Put(columnKey, clientTimeStamp);
+                    byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()];
+                    PInteger.INSTANCE.getCodec().encodeInt(ordinalPosition, ptr, 0);
+                    positionUpdatePut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                            PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr);
+                    mutationsForAddingColumnsToViews.add(positionUpdatePut);
+                    i++;
                 }
             }
-
-            int oldBaseColumnCount = view.getBaseColumnCount();
-
-            Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp);
-            byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
-            PInteger.INSTANCE.getCodec().encodeInt(oldBaseColumnCount + numColsAddedToBaseTable, baseColumnCountPtr, 0);
-            byte[] columnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
-            PInteger.INSTANCE.getCodec().encodeInt(view.getColumns().size() + numColsAddedToView, columnCountPtr, 0);
-            byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()];
-            PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0);
-            viewHeaderRowPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                    PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, clientTimeStamp, columnCountPtr);
-            viewHeaderRowPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                    PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp, baseColumnCountPtr);
-            viewHeaderRowPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                    PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr);
-            mutationsForAddingColumnsToViews.add(viewHeaderRowPut);
+            
             if (view.rowKeyOrderOptimizable()) {
                 UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, viewKey, clientTimeStamp);
             }
+        }
+        return null;
+    }
+    
+    private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean isColumnToBeAddPkCol, PTable view) {
+        if (existingViewColumn != null) {
+            
+            // Validate data type is same
+            int baseColumnDataType = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
+            if (baseColumnDataType != existingViewColumn.getDataType().getSqlType()) {
+                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+            }
 
-            // Update positions of view columns
-            for (PColumn column : view.getColumns()) {
-                if (column.getPosition() >= oldBaseColumnCount) {
-                    int newPosition = column.getPosition() + numColsAddedToView + 1;
-
-                    byte[] k = ByteUtil.concat(viewKey, QueryConstants.SEPARATOR_BYTE_ARRAY, column.getName()
-                            .getBytes());
-                    if (column.getFamilyName() != null) {
-                        k = ByteUtil.concat(k, QueryConstants.SEPARATOR_BYTE_ARRAY, column.getFamilyName().getBytes());
-                    }
-
-                    Put positionUpdatePut = new Put(k, clientTimeStamp);
-                    byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()];
-                    PInteger.INSTANCE.getCodec().encodeInt(newPosition, ptr, 0);
-                    positionUpdatePut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                            PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr);
-                    mutationsForAddingColumnsToViews.add(positionUpdatePut);
+            // Validate max length is same
+            int maxLength = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
+            int existingMaxLength = existingViewColumn.getMaxLength() == null ? 0 : existingViewColumn.getMaxLength();
+            if (maxLength != existingMaxLength) {
+                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); 
+            }
+            
+            // Validate scale is same
+            int scale = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
+            int existingScale = existingViewColumn.getScale() == null ? 0 : existingViewColumn.getScale();
+            if (scale != existingScale) {
+                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); 
+            }
+            
+            // Validate sort order is same
+            int sortOrder = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, SORT_ORDER_BYTES);
+            if (sortOrder != existingViewColumn.getSortOrder().getSystemValue()) {
+                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+            }
+            
+            // if the column to be added to the base table is a pk column, then we need to validate that the key slot position is the same
+            if (isColumnToBeAddPkCol) {
+                List<Cell> keySeqCells = columnToBeAdded.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.KEY_SEQ_BYTES);
+                if (keySeqCells != null && keySeqCells.size() > 0) {
+                    Cell cell = keySeqCells.get(0);
+                    int keySeq = PSmallint.INSTANCE.getCodec().decodeInt(cell.getValueArray(), cell.getValueOffset(),
+                            SortOrder.getDefault());
+                    int pkPosition = SchemaUtil.getPKPosition(view, existingViewColumn) + 1;
+                    if (pkPosition != keySeq) { return new MetaDataMutationResult(
+                            MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); }
                 }
             }
-            invalidateList.add(new ImmutableBytesPtr(viewKey));
         }
         return null;
     }
     
+    private int getInteger(Put p, byte[] family, byte[] qualifier) {
+        List<Cell> cells = p.get(family, qualifier);
+        if (cells != null && cells.size() > 0) {
+            Cell cell = cells.get(0);
+            return (Integer)PInteger.INSTANCE.toObject(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+        }
+        return 0;
+    }
+    
+    private void addViewIndexesHeaderRowMutations(List<Mutation> mutationsForAddingColumnsToViews,
+            List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, PTable view, short deltaNumPkColsSoFar) {
+        if (deltaNumPkColsSoFar > 0) {
+            for (PTable index : view.getIndexes()) {
+                byte[] indexHeaderRowKey = getViewIndexHeaderRowKey(index);
+                Put indexHeaderRowMutation = new Put(indexHeaderRowKey);
+                
+                // increment sequence number
+                long newSequenceNumber = index.getSequenceNumber() + 1;
+                byte[] newSequenceNumberPtr = new byte[PLong.INSTANCE.getByteSize()];
+                PLong.INSTANCE.getCodec().encodeLong(newSequenceNumber, newSequenceNumberPtr, 0);
+                indexHeaderRowMutation.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, newSequenceNumberPtr);
+                
+                // increase the column count
+                int newColumnCount = index.getColumns().size() + deltaNumPkColsSoFar;
+                byte[] newColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
+                PInteger.INSTANCE.getCodec().encodeInt(newColumnCount, newColumnCountPtr, 0);
+                indexHeaderRowMutation.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, newColumnCountPtr);
+                
+                // add index row header key to the invalidate list to force clients to fetch the latest meta-data
+                invalidateList.add(new ImmutableBytesPtr(indexHeaderRowKey));
+                if (index.rowKeyOrderOptimizable()) {
+                    UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, indexHeaderRowKey, clientTimeStamp);
+                }
+                mutationsForAddingColumnsToViews.add(indexHeaderRowMutation);
+            }
+        }
+    }
+
+    private void addMutationsForAddingPkColsToViewIndexes(List<Mutation> mutationsForAddingColumnsToViews, long clientTimeStamp,
+            PTable view, short deltaNumPkColsSoFar, String viewPkColumnName, Put viewColumnDefinitionPut) {
+        for (PTable index : view.getIndexes()) {
+            int oldNumberOfColsInIndex = index.getColumns().size();
+            
+            byte[] indexColumnKey = ByteUtil.concat(getViewIndexHeaderRowKey(index), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(IndexUtil.getIndexColumnName(null, viewPkColumnName)));
+            Put indexColumnDefinitionPut = new Put(indexColumnKey, clientTimeStamp);
+            
+            // Set the index specific data type for the column
+            int viewPkColumnDataType = getInteger(viewColumnDefinitionPut, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
+            byte[] indexColumnDataTypeBytes = new byte[PInteger.INSTANCE.getByteSize()];
+            int indexColumnDataType = IndexUtil.getIndexColumnDataType(true,
+                    PDataType.fromTypeId(viewPkColumnDataType)).getSqlType();
+            PInteger.INSTANCE.getCodec().encodeInt(indexColumnDataType, indexColumnDataTypeBytes, 0);
+            indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.DATA_TYPE_BYTES, indexColumnDataTypeBytes);
+            
+             
+            // Set precision
+            List<Cell> decimalDigits = viewColumnDefinitionPut.get(
+                    PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES);
+            if (decimalDigits != null && decimalDigits.size() > 0) {
+                Cell decimalDigit = decimalDigits.get(0);
+                indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES, decimalDigit.getValueArray());
+            }
+            
+            // Set size
+            List<Cell> columnSizes = viewColumnDefinitionPut.get(
+                    PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES);
+            if (columnSizes != null && columnSizes.size() > 0) {
+                Cell columnSize = columnSizes.get(0);
+                indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES, columnSize.getValueArray());
+            }
+            
+            // Set sort order
+            List<Cell> sortOrders = viewColumnDefinitionPut.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.SORT_ORDER_BYTES);
+            if (sortOrders != null && sortOrders.size() > 0) {
+                Cell sortOrder = sortOrders.get(0);
+                indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.SORT_ORDER_BYTES, sortOrder.getValueArray());
+            }
+            
+            // Set data table name
+            List<Cell> dataTableNames = viewColumnDefinitionPut.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
+            if (dataTableNames != null && dataTableNames.size() > 0) {
+                Cell dataTableName = dataTableNames.get(0);
+                indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES, dataTableName.getValueArray());
+            }
+            
+            // Set the ordinal position of the new column.
+            byte[] ordinalPositionBytes = new byte[PInteger.INSTANCE.getByteSize()];
+            int ordinalPositionOfNewCol = oldNumberOfColsInIndex + deltaNumPkColsSoFar;
+            PInteger.INSTANCE.getCodec().encodeInt(ordinalPositionOfNewCol, ordinalPositionBytes, 0);
+            indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, ordinalPositionBytes);
+            
+            // New PK columns have to be nullable after the first DDL
+            byte[] isNullableBytes = PBoolean.INSTANCE.toBytes(true);
+            indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.NULLABLE_BYTES, isNullableBytes);
+            
+            // Set the key sequence for the pk column to be added
+            short currentKeySeq = SchemaUtil.getMaxKeySeq(index);
+            short newKeySeq = (short)(currentKeySeq + deltaNumPkColsSoFar);
+            byte[] keySeqBytes = new byte[PSmallint.INSTANCE.getByteSize()];
+            PSmallint.INSTANCE.getCodec().encodeShort(newKeySeq, keySeqBytes, 0);
+            indexColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.KEY_SEQ_BYTES, keySeqBytes);
+            
+            mutationsForAddingColumnsToViews.add(indexColumnDefinitionPut);
+        }
+    }
+    
     private byte[] getViewIndexHeaderRowKey(PTable index) {
         byte[] tenantIdBytes = index.getKey().getTenantId() != null ? index.getKey().getTenantId().getBytes() : EMPTY_BYTE_ARRAY;
         byte[] schemaNameBytes = index.getSchemaName() != null ? index.getSchemaName().getBytes() : EMPTY_BYTE_ARRAY;