You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/12/16 18:49:09 UTC

[2/6] phoenix git commit: Sync 4.x-HBase-1.2 to master (Pedro Boado)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 0faa20c..993438e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -96,6 +96,7 @@ import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,7 +123,7 @@ public class MutationState implements SQLCloseable {
     private final long batchSize;
     private final long batchSizeBytes;
     private long batchCount = 0L;
-    private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
+    private final Map<TableRef, MultiRowMutationState> mutations;
     private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10);
 
     private long sizeOffset;
@@ -130,7 +131,7 @@ public class MutationState implements SQLCloseable {
     private long estimatedSize = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
-    private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
+    private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
 
     final PhoenixTransactionContext phoenixTransactionContext;
 
@@ -158,12 +159,12 @@ public class MutationState implements SQLCloseable {
     }
 
     private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) {
-        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, txContext);
+        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, MultiRowMutationState>newHashMapWithExpectedSize(5), subTask, txContext);
         this.sizeOffset = sizeOffset;
     }
 
     MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
+            Map<TableRef, MultiRowMutationState> mutations,
             boolean subTask, PhoenixTransactionContext txContext) {
         this.maxSize = maxSize;
         this.maxSizeBytes = maxSizeBytes;
@@ -188,15 +189,19 @@ public class MutationState implements SQLCloseable {
         }
     }
 
-    public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
+    public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection)  throws SQLException {
         this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
         if (!mutations.isEmpty()) {
             this.mutations.put(table, mutations);
         }
         this.numRows = mutations.size();
-        this.estimatedSize = KeyValueUtil.getEstimatedRowSize(table, mutations);
+        this.estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(this.mutations);
         throwIfTooBig();
     }
+    
+    public long getEstimatedSize() {
+        return estimatedSize;
+    }
 
     public long getMaxSize() {
         return maxSize;
@@ -345,7 +350,7 @@ public class MutationState implements SQLCloseable {
     }
 
     public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null);
+        MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, MultiRowMutationState>emptyMap(), false, null);
         state.sizeOffset = 0;
         return state;
     }
@@ -367,12 +372,12 @@ public class MutationState implements SQLCloseable {
         return sizeOffset + numRows;
     }
     
-    private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+    private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows,
+            Map<TableRef, MultiRowMutationState> dstMutations) {
         PTable table = tableRef.getTable();
         boolean isIndex = table.getType() == PTableType.INDEX;
         boolean incrementRowCount = dstMutations == this.mutations;
-        Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows);
+        MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows);
         if (existingRows != null) { // Rows for that table already exist
             // Loop through new rows and replace existing with new
             for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) {
@@ -384,8 +389,12 @@ public class MutationState implements SQLCloseable {
                         Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
                         // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. 
                         if (newRow != PRow.DELETE_MARKER) {
+                            // decrement estimated size by the size of the old row
+                            estimatedSize-=existingRowMutationState.calculateEstimatedSize();
                             // Merge existing column values with new column values
                             existingRowMutationState.join(rowEntry.getValue());
+                            // increment estimated size by the size of the new row
+                            estimatedSize+=existingRowMutationState.calculateEstimatedSize();
                             // Now that the existing row has been merged with the new row, replace it back
                             // again (since it was merged with the new one above).
                             existingRows.put(rowEntry.getKey(), existingRowMutationState);
@@ -394,6 +403,8 @@ public class MutationState implements SQLCloseable {
                 } else {
                     if (incrementRowCount && !isIndex) { // Don't count index rows in row count
                         numRows++;
+                        // increment estimated size by the size of the new row
+                        estimatedSize += rowEntry.getValue().calculateEstimatedSize();
                     }
                 }
             }
@@ -401,22 +412,25 @@ public class MutationState implements SQLCloseable {
             dstMutations.put(tableRef, existingRows);
         } else {
             // Size new map at batch size as that's what it'll likely grow to.
-            Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
+            MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize());
             newRows.putAll(srcRows);
             dstMutations.put(tableRef, newRows);
             if (incrementRowCount && !isIndex) {
                 numRows += srcRows.size();
+                // if we added all the rows from newMutationState we can just increment the
+                // estimatedSize by newMutationState.estimatedSize
+                estimatedSize +=  srcRows.estimatedSize;
             }
         }
     }
     
-    private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations, 
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+    private void joinMutationState(Map<TableRef, MultiRowMutationState> srcMutations, 
+            Map<TableRef, MultiRowMutationState> dstMutations) {
         // Merge newMutation with this one, keeping state from newMutation for any overlaps
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) {
+        for (Map.Entry<TableRef, MultiRowMutationState> entry : srcMutations.entrySet()) {
             // Replace existing entries for the table with new entries
             TableRef tableRef = entry.getKey();
-            Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue();
+            MultiRowMutationState srcRows = entry.getValue();
             joinMutationState(tableRef, srcRows, dstMutations);
         }
     }
@@ -434,12 +448,7 @@ public class MutationState implements SQLCloseable {
         phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
 
         this.sizeOffset += newMutationState.sizeOffset;
-        int oldNumRows = this.numRows;
         joinMutationState(newMutationState.mutations, this.mutations);
-        // here we increment the estimated size by the fraction of new rows we added from the newMutationState 
-        if (newMutationState.numRows>0) {
-            this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize;
-        }
         if (!newMutationState.txMutations.isEmpty()) {
             if (txMutations.isEmpty()) {
                 txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
@@ -477,7 +486,7 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
     
-    private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values,
+    private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values,
             final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { 
         final PTable table = tableRef.getTable();
         final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
@@ -512,10 +521,10 @@ public class MutationState implements SQLCloseable {
                     // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
                     if (!sendAll) {
                         TableRef key = new TableRef(index);
-                        Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key);
-                        if (rowToColumnMap!=null) {
+                        MultiRowMutationState multiRowMutationState = mutations.remove(key);
+                        if (multiRowMutationState!=null) {
                             final List<Mutation> deleteMutations = Lists.newArrayList();
-                            generateMutations(tableRef, mutationTimestamp, serverTimestamp, rowToColumnMap, deleteMutations, null);
+                            generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null);
                             indexMutations.addAll(deleteMutations);
                         }
                     }
@@ -534,14 +543,14 @@ public class MutationState implements SQLCloseable {
     }
 
     private void generateMutations(final TableRef tableRef, final long mutationTimestamp,
-            final long serverTimestamp, final Map<ImmutableBytesPtr, RowMutationState> values,
+            final long serverTimestamp, final MultiRowMutationState values,
             final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) {
         final PTable table = tableRef.getTable();
         boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
         Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator =
                 values.entrySet().iterator();
         long timestampToUse = mutationTimestamp;
-        Map<ImmutableBytesPtr, RowMutationState> modifiedValues = Maps.newHashMap();
+        MultiRowMutationState modifiedValues = new MultiRowMutationState(16);
         while (iterator.hasNext()) {
             Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next();
             byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes();
@@ -616,7 +625,7 @@ public class MutationState implements SQLCloseable {
     }
     
     public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) {
-        final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
+        final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = this.mutations.entrySet().iterator();
         if (!iterator.hasNext()) {
             return Collections.emptyIterator();
         }
@@ -624,7 +633,7 @@ public class MutationState implements SQLCloseable {
         final long serverTimestamp = getTableTimestamp(tableTimestamp, scn);
         final long mutationTimestamp = getMutationTimestamp(scn);
         return new Iterator<Pair<byte[],List<Mutation>>>() {
-            private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
+            private Map.Entry<TableRef, MultiRowMutationState> current = iterator.next();
             private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
                     
             private Iterator<Pair<byte[],List<Mutation>>> init() {
@@ -688,14 +697,14 @@ public class MutationState implements SQLCloseable {
     private long[] validateAll() throws SQLException {
         int i = 0;
         long[] timeStamps = new long[this.mutations.size()];
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
+        for (Map.Entry<TableRef, MultiRowMutationState> entry : mutations.entrySet()) {
             TableRef tableRef = entry.getKey();
             timeStamps[i++] = validateAndGetServerTimestamp(tableRef, entry.getValue());
         }
         return timeStamps;
     }
     
-    private long validateAndGetServerTimestamp(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException {
+    private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException {
         Long scn = connection.getSCN();
         MetaDataClient client = new MetaDataClient(connection);
         long serverTimeStamp = tableRef.getTimeStamp();
@@ -907,7 +916,7 @@ public class MutationState implements SQLCloseable {
             sendAll = true;
         }
 
-        Map<ImmutableBytesPtr, RowMutationState> valuesMap;
+        MultiRowMutationState multiRowMutationState;
         Map<TableInfo,List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap(); 
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
@@ -916,16 +925,16 @@ public class MutationState implements SQLCloseable {
             while (tableRefIterator.hasNext()) {
                 // at this point we are going through mutations for each table
                 final TableRef tableRef = tableRefIterator.next();
-                valuesMap = mutations.get(tableRef);
-                if (valuesMap == null || valuesMap.isEmpty()) {
+                multiRowMutationState = mutations.get(tableRef);
+                if (multiRowMutationState == null || multiRowMutationState.isEmpty()) {
                     continue;
                 }
                 // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
-                long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, valuesMap) : serverTimeStamps[i++];
+                long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, multiRowMutationState) : serverTimeStamps[i++];
                 Long scn = connection.getSCN();
                 long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
                 final PTable table = tableRef.getTable();
-                Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, mutationTimestamp, serverTimestamp, false, sendAll);
+                Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll);
                 // build map from physical table to mutation list
                 boolean isDataTable = true;
                 while (mutationsIterator.hasNext()) {
@@ -943,7 +952,7 @@ public class MutationState implements SQLCloseable {
                 // involved in the transaction since none of them would have been
                 // committed in the event of a failure.
                 if (table.isTransactional()) {
-                    addUncommittedStatementIndexes(valuesMap.values());
+                    addUncommittedStatementIndexes(multiRowMutationState.values());
                     if (txMutations.isEmpty()) {
                         txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
                     }
@@ -952,7 +961,7 @@ public class MutationState implements SQLCloseable {
                     // in the event that we need to replay the commit.
                     // Copy TableRef so we have the original PTable and know when the
                     // indexes have changed.
-                    joinMutationState(new TableRef(tableRef), valuesMap, txMutations);
+                    joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations);
                 }
             }
             long serverTimestamp = HConstants.LATEST_TIMESTAMP;
@@ -974,8 +983,6 @@ public class MutationState implements SQLCloseable {
                 long mutationCommitTime = 0;
                 long numFailedMutations = 0;;
                 long startTime = 0;
-                long startNumRows = numRows;
-                long startEstimatedSize = estimatedSize;
                 do {
                     TableRef origTableRef = tableInfo.getOrigTableRef();
                     PTable table = origTableRef.getTable();
@@ -1021,13 +1028,13 @@ public class MutationState implements SQLCloseable {
                         GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
                         numFailedMutations = 0;
                         
+                        // Remove batches as we process them
+                        mutations.remove(origTableRef);
                         if (tableInfo.isDataTable()) {
                             numRows -= numMutations;
-                            // decrement estimated size by the fraction of rows we sent to hbase
-                            estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize;
+                            // recalculate the estimated size
+                            estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(mutations);
                         }
-                        // Remove batches as we process them
-                        mutations.remove(origTableRef);
                     } catch (Exception e) {
                     	mutationCommitTime = System.currentTimeMillis() - startTime;
                         serverTimestamp = ServerUtil.parseServerTimestamp(e);
@@ -1178,7 +1185,7 @@ public class MutationState implements SQLCloseable {
     }
     
     private int[] getUncommittedStatementIndexes() {
-        for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) {
+        for (MultiRowMutationState rowMutationMap : mutations.values()) {
             addUncommittedStatementIndexes(rowMutationMap.values());
         }
         return uncommittedStatementIndexes;
@@ -1211,7 +1218,7 @@ public class MutationState implements SQLCloseable {
     }
 
     public void commit() throws SQLException {
-        Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
+        Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
         int retryCount = 0;
         do {
             boolean sendSuccessful=false;
@@ -1421,13 +1428,54 @@ public class MutationState implements SQLCloseable {
         }
     }
     
+    public static class MultiRowMutationState {
+        private Map<ImmutableBytesPtr,RowMutationState> rowKeyToRowMutationState;
+        private long estimatedSize;
+        
+        public MultiRowMutationState(int size) {
+            this.rowKeyToRowMutationState = Maps.newHashMapWithExpectedSize(size);
+            this.estimatedSize = 0;
+        }
+        
+        public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) { 
+            estimatedSize += rowMutationState.calculateEstimatedSize();
+            return rowKeyToRowMutationState.put(ptr, rowMutationState);
+        }
+        
+        public void putAll(MultiRowMutationState other) {
+            estimatedSize += other.estimatedSize;
+            rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState);
+        }
+        
+        public boolean isEmpty() {
+            return rowKeyToRowMutationState.isEmpty();
+        }
+        
+        public int size() {
+            return rowKeyToRowMutationState.size();
+        }
+        
+        public Set<Entry<ImmutableBytesPtr, RowMutationState>> entrySet() {
+            return rowKeyToRowMutationState.entrySet();
+        }
+        
+        public void clear(){
+            rowKeyToRowMutationState.clear();
+        }
+        
+        public Collection<RowMutationState> values() {
+            return rowKeyToRowMutationState.values();
+        }
+    }
+    
     public static class RowMutationState {
         @Nonnull private Map<PColumn,byte[]> columnValues;
         private int[] statementIndexes;
         @Nonnull private final RowTimestampColInfo rowTsColInfo;
         private byte[] onDupKeyBytes;
+        private long colValuesSize;
         
-        public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
+        public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, long colValuesSize, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
                 byte[] onDupKeyBytes) {
             checkNotNull(columnValues);
             checkNotNull(rowTsColInfo);
@@ -1435,6 +1483,12 @@ public class MutationState implements SQLCloseable {
             this.statementIndexes = new int[] {statementIndex};
             this.rowTsColInfo = rowTsColInfo;
             this.onDupKeyBytes = onDupKeyBytes;
+            this.colValuesSize = colValuesSize;
+        }
+
+        public long calculateEstimatedSize() {
+            return colValuesSize + statementIndexes.length * SizedUtil.INT_SIZE + SizedUtil.LONG_SIZE
+                    + (onDupKeyBytes != null ? onDupKeyBytes.length : 0);
         }
 
         byte[] getOnDupKeyBytes() {
@@ -1453,7 +1507,16 @@ public class MutationState implements SQLCloseable {
             // If we already have a row and the new row has an ON DUPLICATE KEY clause
             // ignore the new values (as that's what the server will do).
             if (newRow.onDupKeyBytes == null) {
-                getColumnValues().putAll(newRow.getColumnValues());
+                // increment the column value size by the new row column value size
+                colValuesSize+=newRow.colValuesSize;
+                for (Map.Entry<PColumn,byte[]> entry : newRow.columnValues.entrySet()) {
+                    PColumn col = entry.getKey();
+                    byte[] oldValue = columnValues.put(col, entry.getValue());
+                    if (oldValue!=null) {
+                        // decrement column value size by the size of all column values that were replaced
+                        colValuesSize-=(col.getEstimatedSize() + oldValue.length);
+                    }
+                }
             }
             // Concatenate ON DUPLICATE KEY bytes to allow multiple
             // increments of the same row in the same commit batch.
@@ -1465,7 +1528,7 @@ public class MutationState implements SQLCloseable {
         RowTimestampColInfo getRowTimestampColInfo() {
             return rowTsColInfo;
         }
-       
+
     }
     
     public ReadMetricQueue getReadMetricQueue() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 1e1cb0d..31d7097 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -53,6 +53,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -64,6 +65,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
@@ -189,6 +191,29 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        int parallelLevel = CostUtil.estimateParallelLevel(
+                true, context.getConnection().getQueryServices());
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+            cost = cost.plus(orderByCost);
+        }
+        return cost;
+    }
+
+    @Override
     public List<KeyRange> getSplits() {
         if (splits == null)
             return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index fab7c59..3e380da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.query.KeyRange;
@@ -192,6 +193,23 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        return cost.plus(lhsPlan.getCost()).plus(rhsPlan.getCost());
+    }
+
+    @Override
     public StatementContext getContext() {
         return context;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index e06522f..e6bf654 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.UnionResultIterators;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -210,6 +211,15 @@ public class UnionPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Cost cost = Cost.ZERO;
+        for (QueryPlan plan : plans) {
+            cost = cost.plus(plan.getCost());
+        }
+        return cost;
+    }
+
+    @Override
     public ParameterMetaData getParameterMetaData() {
         return paramMetaData;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
index 15f6e3e..9bb7234 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
@@ -199,8 +199,8 @@ public class RowValueConstructorExpression extends BaseCompoundExpression {
                     // as otherwise we need it to ensure sort order is correct
                     for (int k = expressionCount -1 ; 
                             k >=0 &&  getChildren().get(k).getDataType() != null 
-                                  && !getChildren().get(k).getDataType().isFixedWidth() 
-                                  && outputBytes[outputSize-1] == QueryConstants.SEPARATOR_BYTE ; k--) {
+                                  && !getChildren().get(k).getDataType().isFixedWidth()
+                                  && outputBytes[outputSize-1] == SchemaUtil.getSeparatorByte(true, false, getChildren().get(k)) ; k--) {
                         outputSize--;
                     }
                     ptr.set(outputBytes, 0, outputSize);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 0fc138f..ba6371b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.index;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
@@ -161,12 +163,12 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
     }
 
     private long handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted,
-            Exception cause) throws Throwable {
+            final Exception cause) throws Throwable {
         Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
-        Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size());
+        final Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size());
         // start by looking at all the tables to which we attempted to write
         long timestamp = 0;
-        boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure;
+        final boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure;
         // if using TrackingParallelWriter, we know which indexes failed and only disable those
         Set<HTableInterfaceReference> failedTables = cause instanceof MultiIndexWriteFailureException 
                 ? new HashSet<HTableInterfaceReference>(((MultiIndexWriteFailureException)cause).getFailedTables())
@@ -210,55 +212,66 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
             return timestamp;
         }
 
-        PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.PENDING_ACTIVE;
+        final PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.PENDING_ACTIVE;
+        final long fTimestamp=timestamp;
         // for all the index tables that we've found, try to disable them and if that fails, try to
-        for (Map.Entry<String, Long> tableTimeElement :indexTableNames.entrySet()){
-            String indexTableName = tableTimeElement.getKey();
-            long minTimeStamp = tableTimeElement.getValue();
-            // We need a way of differentiating the block writes to data table case from
-            // the leave index active case. In either case, we need to know the time stamp
-            // at which writes started failing so we can rebuild from that point. If we
-            // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
-            // then writes to the data table will be blocked (this is client side logic
-            // and we can't change this in a minor release). So we use the sign of the
-            // time stamp to differentiate.
-            if (!disableIndexOnFailure && !blockDataTableWritesOnFailure) {
-                minTimeStamp *= -1;
-            }
-            // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
-            try (HTableInterface systemTable = env.getTable(SchemaUtil
-                    .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()))) {
-                MetaDataMutationResult result = IndexUtil.updateIndexState(indexTableName, minTimeStamp,
-                        systemTable, newState);
-                if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
-                    LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
-                    continue;
-                }
-                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
-                    if (leaveIndexActive) {
-                        LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = "
-                                + result.getMutationCode());
-                        // If we're not disabling the index, then we don't want to throw as throwing
-                        // will lead to the RS being shutdown.
-                        if (blockDataTableWritesOnFailure) {
-                            throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed.");
+        return User.runAsLoginUser(new PrivilegedExceptionAction<Long>() {
+            @Override
+            public Long run() throws Exception {
+                for (Map.Entry<String, Long> tableTimeElement : indexTableNames.entrySet()) {
+                    String indexTableName = tableTimeElement.getKey();
+                    long minTimeStamp = tableTimeElement.getValue();
+                    // We need a way of differentiating the block writes to data table case from
+                    // the leave index active case. In either case, we need to know the time stamp
+                    // at which writes started failing so we can rebuild from that point. If we
+                    // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
+                    // then writes to the data table will be blocked (this is client side logic
+                    // and we can't change this in a minor release). So we use the sign of the
+                    // time stamp to differentiate.
+                    if (!disableIndexOnFailure && !blockDataTableWritesOnFailure) {
+                        minTimeStamp *= -1;
+                    }
+                    // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
+                    try (HTableInterface systemTable = env.getTable(SchemaUtil.getPhysicalTableName(
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()))) {
+                        MetaDataMutationResult result = IndexUtil.updateIndexState(indexTableName, minTimeStamp,
+                                systemTable, newState);
+                        if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
+                            LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
+                            continue;
+                        }
+                        if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
+                            if (leaveIndexActive) {
+                                LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = "
+                                        + result.getMutationCode());
+                                // If we're not disabling the index, then we don't want to throw as throwing
+                                // will lead to the RS being shutdown.
+                                if (blockDataTableWritesOnFailure) { throw new DoNotRetryIOException(
+                                        "Attempt to update INDEX_DISABLE_TIMESTAMP failed."); }
+                            } else {
+                                LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
+                                        + result.getMutationCode() + ". Will use default failure policy instead.");
+                                throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
+                            }
+                        }
+                        if (leaveIndexActive)
+                            LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName
+                                    + " due to an exception while writing updates.", cause);
+                        else
+                            LOG.info("Successfully disabled index " + indexTableName
+                                    + " due to an exception while writing updates.", cause);
+                    } catch (Throwable t) {
+                        if (t instanceof Exception) {
+                            throw (Exception)t;
+                        } else {
+                            throw new Exception(t);
                         }
-                    } else {
-                        LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
-                                + result.getMutationCode() + ". Will use default failure policy instead.");
-                        throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
-                    } 
+                    }
                 }
-                if (leaveIndexActive)
-                    LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.",
-                            cause);
-                else
-                    LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.",
-                            cause);
+                // Return the cell time stamp (note they should all be the same)
+                return fTimestamp;
             }
-        }
-        // Return the cell time stamp (note they should all be the same)
-        return timestamp;
+        });
     }
 
     private Collection<? extends String> getLocalIndexNames(HTableInterfaceReference ref,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d35cce1..c699088 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -91,12 +91,14 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AddJarsStatement;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.AlterIndexStatement;
 import org.apache.phoenix.parse.AlterSessionStatement;
 import org.apache.phoenix.parse.BindableStatement;
+import org.apache.phoenix.parse.ChangePermsStatement;
 import org.apache.phoenix.parse.CloseStatement;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.ColumnName;
@@ -212,8 +214,9 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         QUERY("queried", false),
         DELETE("deleted", true),
         UPSERT("upserted", true),
-        UPGRADE("upgrade", true);
-        
+        UPGRADE("upgrade", true),
+        ADMIN("admin", true);
+
         private final String toString;
         private final boolean isMutation;
         Operation(String toString, boolean isMutation) {
@@ -645,6 +648,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 }
 
                 @Override
+                public Cost getCost() {
+                    return Cost.ZERO;
+                }
+
+                @Override
                 public TableRef getTableRef() {
                     return null;
                 }
@@ -1153,6 +1161,33 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         }
     }
 
+    private static class ExecutableChangePermsStatement extends ChangePermsStatement implements CompilableStatement {
+
+        public ExecutableChangePermsStatement (String permsString, boolean isSchemaName, TableName tableName,
+                                               String schemaName, boolean isGroupName, LiteralParseNode userOrGroup, boolean isGrantStatement) {
+            super(permsString, isSchemaName, tableName, schemaName, isGroupName, userOrGroup, isGrantStatement);
+        }
+
+        @Override
+        public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            final StatementContext context = new StatementContext(stmt);
+
+            return new BaseMutationPlan(context, this.getOperation()) {
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("GRANT PERMISSION"));
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    MetaDataClient client = new MetaDataClient(getContext().getConnection());
+                    return client.changePermissions(ExecutableChangePermsStatement.this);
+                }
+            };
+        }
+    }
+
     private static class ExecutableDropIndexStatement extends DropIndexStatement implements CompilableStatement {
 
         public ExecutableDropIndexStatement(NamedNode indexName, TableName tableName, boolean ifExists) {
@@ -1181,8 +1216,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
 
     private static class ExecutableAlterIndexStatement extends AlterIndexStatement implements CompilableStatement {
 
-        public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
-            super(indexTableNode, dataTableName, ifExists, state, async);
+        public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+            super(indexTableNode, dataTableName, ifExists, state, async, props);
         }
 
         @SuppressWarnings("unchecked")
@@ -1313,11 +1348,12 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 public ExplainPlan getExplainPlan() throws SQLException {
                     return new ExplainPlan(Collections.singletonList("EXECUTE UPGRADE"));
                 }
-                
+
                 @Override
-                public StatementContext getContext() {
-                    return new StatementContext(stmt);
-                }
+                public QueryPlan getQueryPlan() { return null; }
+
+                @Override
+                public StatementContext getContext() { return new StatementContext(stmt); }
                 
                 @Override
                 public TableRef getTargetRef() {
@@ -1527,10 +1563,10 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         public DropIndexStatement dropIndex(NamedNode indexName, TableName tableName, boolean ifExists) {
             return new ExecutableDropIndexStatement(indexName, tableName, ifExists);
         }
-        
+
         @Override
-        public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
-            return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async);
+        public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+            return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async, props);
         }
 
         @Override
@@ -1557,6 +1593,13 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         public ExecuteUpgradeStatement executeUpgrade() {
             return new ExecutableExecuteUpgradeStatement();
         }
+
+        @Override
+        public ExecutableChangePermsStatement changePermsStatement(String permsString, boolean isSchemaName, TableName tableName,
+                                                         String schemaName, boolean isGroupName, LiteralParseNode userOrGroup, boolean isGrantStatement) {
+            return new ExecutableChangePermsStatement(permsString, isSchemaName, tableName, schemaName, isGroupName, userOrGroup,isGrantStatement);
+        }
+
     }
     
     static class PhoenixStatementParser extends SQLParser {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
index e55b977..4217e40 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.mapreduce;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +38,15 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
  */
 public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> {
     private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
+    private final Set<String> propsToIgnore;
+    
+    public PhoenixOutputFormat() {
+        this(Collections.<String>emptySet());
+    }
+    
+    public PhoenixOutputFormat(Set<String> propsToIgnore) {
+        this.propsToIgnore = propsToIgnore;
+    }
     
     @Override
     public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {      
@@ -52,7 +63,7 @@ public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<Nul
     @Override
     public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
         try {
-            return new PhoenixRecordWriter<T>(context.getConfiguration());
+            return new PhoenixRecordWriter<T>(context.getConfiguration(), propsToIgnore);
         } catch (SQLException e) {
             LOG.error("Error calling PhoenixRecordWriter "  + e.getMessage());
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 70ee3f5..52f2fe3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,7 +48,11 @@ public class PhoenixRecordWriter<T extends DBWritable>  extends RecordWriter<Nul
     private long numRecords = 0;
     
     public PhoenixRecordWriter(final Configuration configuration) throws SQLException {
-        this.conn = ConnectionUtil.getOutputConnection(configuration);
+        this(configuration, Collections.<String>emptySet());
+    }
+    
+    public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException {
+        this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
         this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
         final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
         this.statement = this.conn.prepareStatement(upsertQuery);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index ada3816..56a5ef5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -20,15 +20,16 @@ package org.apache.phoenix.mapreduce.util;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.Set;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Utility class to return a {@link Connection} .
  */
@@ -74,15 +75,29 @@ public class ConnectionUtil {
      * Create the configured output Connection.
      *
      * @param conf configuration containing the connection information
+     * @return the configured output connection
+     */
+    public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf, Set<String> ignoreTheseProps) throws SQLException {
+        return getOutputConnection(conf, new Properties(), ignoreTheseProps);
+    }
+    
+    /**
+     * Create the configured output Connection.
+     *
+     * @param conf configuration containing the connection information
      * @param props custom connection properties
      * @return the configured output connection
      */
     public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException {
+        return getOutputConnection(conf, props, Collections.<String>emptySet());
+    }
+    
+    public static Connection getOutputConnection(final Configuration conf, Properties props, Set<String> withoutTheseProps) throws SQLException {
         Preconditions.checkNotNull(conf);
 		return getConnection(PhoenixConfigurationUtil.getOutputCluster(conf),
 				PhoenixConfigurationUtil.getClientPort(conf),
 				PhoenixConfigurationUtil.getZNodeParent(conf),
-				PropertiesUtil.combineProperties(props, conf));
+				PropertiesUtil.combineProperties(props, conf, withoutTheseProps));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
new file mode 100644
index 0000000..b83f354
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.optimize;
+
+import java.util.Objects;
+
+/**
+ * Optimizer cost in terms of CPU, memory, and I/O usage, the unit of which is now the
+ * number of bytes processed.
+ *
+ */
+public class Cost implements Comparable<Cost> {
+    /** The unknown cost. */
+    public static Cost UNKNOWN = new Cost(Double.NaN, Double.NaN, Double.NaN) {
+        @Override
+        public String toString() {
+            return "{unknown}";
+        }
+    };
+
+    /** The zero cost. */
+    public static Cost ZERO = new Cost(0, 0, 0) {
+        @Override
+        public String toString() {
+            return "{zero}";
+        }        
+    };
+
+    private final double cpu;
+    private final double memory;
+    private final double io;
+
+    public Cost(double cpu, double memory, double io) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.io = io;
+    }
+
+    public double getCpu() {
+        return cpu;
+    }
+
+    public double getMemory() {
+        return memory;
+    }
+
+    public double getIo() {
+        return io;
+    }
+
+    public boolean isUnknown() {
+        return this == UNKNOWN;
+    }
+
+    public Cost plus(Cost other) {
+        if (isUnknown() || other.isUnknown()) {
+            return UNKNOWN;
+        }
+
+        return new Cost(
+                this.cpu + other.cpu,
+                this.memory + other.memory,
+                this.io + other.io);
+    }
+
+    public Cost multiplyBy(double factor) {
+        if (isUnknown()) {
+            return UNKNOWN;
+        }
+
+        return new Cost(
+                this.cpu * factor,
+                this.memory * factor,
+                this.io * factor);
+    }
+
+    // TODO right now for simplicity, we choose to ignore CPU and memory costs. We may
+    // add those into account as our cost model mature.
+    @Override
+    public int compareTo(Cost other) {
+        if (isUnknown() && other.isUnknown()) {
+            return 0;
+        } else if (isUnknown() && !other.isUnknown()) {
+            return 1;
+        } else if (!isUnknown() && other.isUnknown()) {
+            return -1;
+        }
+
+        double d = this.io - other.io;
+        return d == 0 ? 0 : (d > 0 ? 1 : -1);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return this == obj
+                || (obj instanceof Cost && this.compareTo((Cost) obj) == 0);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(cpu, memory, io);
+    }
+
+    @Override
+    public String toString() {
+        return "{cpu: " + cpu + ", memory: " + memory + ", io: " + io + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index b3df50b..64dad58 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -67,10 +67,12 @@ public class QueryOptimizer {
 
     private final QueryServices services;
     private final boolean useIndexes;
+    private final boolean costBased;
 
     public QueryOptimizer(QueryServices services) {
         this.services = services;
         this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES);
+        this.costBased = this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
     }
 
     public QueryPlan optimize(PhoenixStatement statement, QueryPlan dataPlan) throws SQLException {
@@ -91,7 +93,7 @@ public class QueryOptimizer {
     }
     
     public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
-        List<QueryPlan>plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
+        List<QueryPlan> plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
         return plans.get(0);
     }
     
@@ -309,10 +311,11 @@ public class QueryOptimizer {
         }
         return null;
     }
-    
+
     /**
      * Order the plans among all the possible ones from best to worst.
-     * Since we don't keep stats yet, we use the following simple algorithm:
+     * If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, we order the plans based on
+     * their costs, otherwise we use the following simple algorithm:
      * 1) If the query is a point lookup (i.e. we have a set of exact row keys), choose that one immediately.
      * 2) If the query has an ORDER BY and a LIMIT, choose the plan that has all the ORDER BY expression
      * in the same order as the row key columns.
@@ -320,9 +323,6 @@ public class QueryOptimizer {
      *    a) the most row key columns that may be used to form the start/stop scan key (i.e. bound slots).
      *    b) the plan that preserves ordering for a group by.
      *    c) the non local index table plan
-     * TODO: We should make more of a cost based choice: The largest number of bound slots does not necessarily
-     * correspond to the least bytes scanned. We could consider the slots bound for upper and lower ranges 
-     * separately, or we could calculate the bytes scanned between the start and stop row of each table.
      * @param plans the list of candidate plans
      * @return list of plans ordered from best to worst.
      */
@@ -331,7 +331,21 @@ public class QueryOptimizer {
         if (plans.size() == 1) {
             return plans;
         }
-        
+
+        if (this.costBased) {
+            Collections.sort(plans, new Comparator<QueryPlan>() {
+                @Override
+                public int compare(QueryPlan plan1, QueryPlan plan2) {
+                    return plan1.getCost().compareTo(plan2.getCost());
+                }
+            });
+            // Return ordered list based on cost if stats are available; otherwise fall
+            // back to static ordering.
+            if (!plans.get(0).getCost().isUnknown()) {
+                return stopAtBestPlan ? plans.subList(0, 1) : plans;
+            }
+        }
+
         /**
          * If we have a plan(s) that are just point lookups (i.e. fully qualified row
          * keys), then favor those first.
@@ -428,7 +442,7 @@ public class QueryOptimizer {
             }
             
         });
-        
+
         return stopAtBestPlan ? bestCandidates.subList(0, 1) : bestCandidates;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
index 1890d31..678e560 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
@@ -48,4 +48,4 @@ public class AddColumnStatement extends AlterTableStatement {
     public ListMultimap<String,Pair<String,Object>> getProps() {
         return props;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
index 11328c2..de04505 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
@@ -17,20 +17,31 @@
  */
 package org.apache.phoenix.parse;
 
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
 
 public class AlterIndexStatement extends SingleTableStatement {
     private final String dataTableName;
     private final boolean ifExists;
     private final PIndexState indexState;
     private boolean async;
+    private ListMultimap<String,Pair<String,Object>> props;
+    private static final PTableType tableType=PTableType.INDEX;
 
     public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState, boolean async) {
+        this(indexTableNode,dataTableName,ifExists,indexState,async,null);
+    }
+
+    public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState, boolean async, ListMultimap<String,Pair<String,Object>> props) {
         super(indexTableNode,0);
         this.dataTableName = dataTableName;
         this.ifExists = ifExists;
         this.indexState = indexState;
         this.async = async;
+        this.props= props==null ? ImmutableListMultimap.<String,Pair<String,Object>>of() : props;
     }
 
     public String getTableName() {
@@ -54,4 +65,7 @@ public class AlterIndexStatement extends SingleTableStatement {
         return async;
     }
 
+    public ListMultimap<String,Pair<String,Object>> getProps() { return props; }
+
+    public PTableType getTableType(){ return tableType; }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/ChangePermsStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ChangePermsStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ChangePermsStatement.java
new file mode 100644
index 0000000..0eae26f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ChangePermsStatement.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.antlr.runtime.RecognitionException;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.phoenix.exception.PhoenixParserException;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.util.Arrays;
+
+/**
+ * See PHOENIX-672, Use GRANT/REVOKE statements to assign or remove permissions for a user OR group on a table OR namespace
+ * Permissions are managed by HBase using hbase:acl table, Allowed permissions are RWXCA
+ */
+public class ChangePermsStatement implements BindableStatement {
+
+    private Permission.Action[] permsList;
+    private TableName tableName;
+    private String schemaName;
+    private String name;
+    // Grant/Revoke statements are differentiated based on this boolean
+    private boolean isGrantStatement;
+
+    public ChangePermsStatement(String permsString, boolean isSchemaName,
+                                TableName tableName, String schemaName, boolean isGroupName, LiteralParseNode ugNode, boolean isGrantStatement) {
+        // PHOENIX-672 HBase API doesn't allow to revoke specific permissions, hence this parameter will be ignored here.
+        // To comply with SQL standards, we may support the user given permissions to revoke specific permissions in future.
+        // GRANT permissions statement requires this parameter and the parsing will fail if it is not specified in SQL
+        if(permsString != null) {
+            Permission permission = new Permission(permsString.getBytes());
+            permsList = permission.getActions();
+        }
+        if(isSchemaName) {
+            this.schemaName = SchemaUtil.normalizeIdentifier(schemaName);
+        } else {
+            this.tableName = tableName;
+        }
+        name = SchemaUtil.normalizeLiteral(ugNode);
+        name = isGroupName ? AuthUtil.toGroupEntry(name) : name;
+        this.isGrantStatement = isGrantStatement;
+    }
+
+    public Permission.Action[] getPermsList() {
+        return permsList;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public TableName getTableName() {
+        return tableName;
+    }
+
+    public String getSchemaName() {
+        return schemaName;
+    }
+
+    public boolean isGrantStatement() {
+        return isGrantStatement;
+    }
+
+    public String toString() {
+        StringBuffer buffer = new StringBuffer();
+        buffer = this.isGrantStatement() ? buffer.append("GRANT ") : buffer.append("REVOKE ");
+        buffer.append("permissions requested for user/group: " + this.getName());
+        if (this.getSchemaName() != null) {
+            buffer.append(" for Schema: " + this.getSchemaName());
+        } else if (this.getTableName() != null) {
+            buffer.append(" for Table: " + this.getTableName());
+        }
+        buffer.append(" Permissions: " + Arrays.toString(this.getPermsList()));
+        return buffer.toString();
+    }
+
+    @Override
+    public int getBindCount() {
+        return 0;
+    }
+
+    @Override
+    public PhoenixStatement.Operation getOperation() {
+        return PhoenixStatement.Operation.ADMIN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java
index 7c255cb..f5ab3f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java
@@ -24,7 +24,7 @@ public class CreateSchemaStatement extends MutableStatement {
 	private final boolean ifNotExists;
 	
 	public CreateSchemaStatement(String schemaName,boolean ifNotExists) {
-		this.schemaName = null == schemaName ? SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE : schemaName;
+		this.schemaName = schemaName;
 		this.ifNotExists = ifNotExists;
 	}
 	

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 0058f38..9be59f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -374,10 +373,10 @@ public class ParseNodeFactory {
         return new DropIndexStatement(indexName, tableName, ifExists);
     }
 
-    public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
-        return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async);
+    public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+        return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async, props);
     }
-    
+
     public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
         return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, false);
     }
@@ -925,4 +924,10 @@ public class ParseNodeFactory {
     public UseSchemaStatement useSchema(String schemaName) {
         return new UseSchemaStatement(schemaName);
     }
+
+    public ChangePermsStatement changePermsStatement(String permsString, boolean isSchemaName, TableName tableName
+            , String schemaName, boolean isGroupName, LiteralParseNode userOrGroup, boolean isGrantStatement) {
+        return new ChangePermsStatement(permsString, isSchemaName, tableName, schemaName, isGroupName, userOrGroup, isGrantStatement);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 45ab5fa..90f8089 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -86,6 +86,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     public MetaDataMutationResult addColumn(List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<PColumn> columns) throws SQLException;
     public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException;
     public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException;
+    public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName,  Map<String, List<Pair<String,Object>>> stmtProperties,  PTable table) throws SQLException;
+
     public MutationState updateData(MutationPlan plan) throws SQLException;
 
     public void init(String url, Properties props) throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 532b586..072bf28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.NamespaceNotFoundException;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
@@ -190,6 +191,7 @@ import org.apache.phoenix.schema.EmptySequenceCacheException;
 import org.apache.phoenix.schema.FunctionNotFoundException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
+import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
@@ -867,7 +869,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     }
                 }
             }
-            if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
+            if ((SchemaUtil.isStatsTable(tableName) || SchemaUtil.isMetaTable(tableName))
+                    && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
                 descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
                         null, priority, null);
             }
@@ -1223,7 +1226,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             SQLExceptionCode.INCONSISTENT_NAMESPACE_MAPPING_PROPERTIES)
                     .setMessage(
                             "Ensure that config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED
-                            + " is consitent on client and server.")
+                            + " is consistent on client and server.")
                             .build().buildException(); }
             lowestClusterHBaseVersion = minHBaseVersion;
         } catch (SQLException e) {
@@ -1721,7 +1724,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
         Set<HTableDescriptor> origTableDescriptors = Collections.emptySet();
         boolean nonTxToTx = false;
-        Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, families, tableProps);
+        Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, tableProps);
         HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond();
         HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst();
         if (tableDescriptor != null) {
@@ -1939,7 +1942,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps);
     }
 
-    private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<Pair<byte[], Map<String, Object>>> families, Map<String, Object> tableProps) throws SQLException {
+    private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties,
+      Set<String> colFamiliesForPColumnsToBeAdded, Map<String, Object> tableProps) throws SQLException {
         Map<String, Map<String, Object>> stmtFamiliesPropsMap = new HashMap<>(properties.size());
         Map<String,Object> commonFamilyProps = new HashMap<>();
         boolean addingColumns = colFamiliesForPColumnsToBeAdded != null && !colFamiliesForPColumnsToBeAdded.isEmpty();
@@ -2458,6 +2462,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                         logger.warn("Could not check for Phoenix SYSTEM tables, assuming they exist and are properly configured");
                                         checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, getProps()).getName());
                                         success = true;
+                                    } else if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), NamespaceNotFoundException.class))) {
+                                        // This exception is only possible if SYSTEM namespace mapping is enabled and SYSTEM namespace is missing
+                                        // It implies that SYSTEM tables are not created and hence we shouldn't provide a connection
+                                        AccessDeniedException ade = new AccessDeniedException("Insufficient permissions to create SYSTEM namespace and SYSTEM Tables");
+                                        initializationException = ServerUtil.parseServerException(ade);
                                     } else {
                                         initializationException = e;
                                     }
@@ -2469,8 +2478,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 // with SYSTEM Namespace. (See PHOENIX-4227 https://issues.apache.org/jira/browse/PHOENIX-4227)
                                 if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
                                         ConnectionQueryServicesImpl.this.getProps())) {
-                                    metaConnection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS "
-                                            + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+                                    try {
+                                        metaConnection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS "
+                                                + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+                                    } catch (NewerSchemaAlreadyExistsException e) {
+                                        // Older clients with appropriate perms may try getting a new connection
+                                        // This results in NewerSchemaAlreadyExistsException, so we can safely ignore it here
+                                    } catch (PhoenixIOException e) {
+                                        if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class))) {
+                                            // Ignore ADE
+                                        } else {
+                                            throw e;
+                                        }
+                                    }
                                 }
                                 if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
                                     createOtherSystemTables(metaConnection, hBaseAdmin);
@@ -2528,7 +2548,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             final TableName mutexTableName = SchemaUtil.getPhysicalTableName(
                 PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME, props);
             List<TableName> systemTables = getSystemTableNames(admin);
-            if (systemTables.contains(mutexTableName)) {
+            if (systemTables.contains(mutexTableName) || admin.tableExists( TableName.valueOf(
+                PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME,PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME))) {
                 logger.debug("System mutex table already appears to exist, not creating it");
                 return;
             }
@@ -2545,8 +2566,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 put.add(PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES, UPGRADE_MUTEX, UPGRADE_MUTEX_UNLOCKED);
                 sysMutexTable.put(put);
             }
-        } catch (TableExistsException e) {
+        } catch (TableExistsException | AccessDeniedException e) {
             // Ignore
+        }catch(PhoenixIOException e){
+            if(e.getCause()!=null && e.getCause() instanceof AccessDeniedException)
+            {
+                //Ignore
+            }else{
+                throw e;
+            }
         }
     }
 
@@ -3573,6 +3601,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     @Override
+    public MetaDataMutationResult updateIndexState(final List<Mutation> tableMetaData, String parentTableName, Map<String, List<Pair<String,Object>>> stmtProperties,  PTable table) throws SQLException {
+        if(stmtProperties==null) return updateIndexState(tableMetaData,parentTableName);
+
+        Map<String, Object> tableProps = new HashMap<String, Object>();
+        Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, new HashSet<String>(), tableProps);
+        HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond();
+        HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst();
+        Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
+        Set<HTableDescriptor> origTableDescriptors = Collections.emptySet();
+        if (tableDescriptor != null) {
+            tableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
+            origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
+            tableDescriptors.add(tableDescriptor);
+            origTableDescriptors.add(origTableDescriptor);
+        }
+        sendHBaseMetaData(tableDescriptors, true);
+        return updateIndexState(tableMetaData,parentTableName);
+    }
+
+    @Override
     public long createSequence(String tenantId, String schemaName, String sequenceName,
             long startWith, long incrementBy, long cacheSize, long minValue, long maxValue,
             boolean cycle, long timestamp) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index f15e0b1..3154f86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -383,6 +383,13 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     @Override
+    public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata,
+            String parentTableName, Map<String, List<Pair<String, Object>>> stmtProperties,
+            PTable table) throws SQLException {
+        return updateIndexState(tableMetadata,parentTableName);
+    }
+
+    @Override
     public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 6c464eb..05d1af6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -135,7 +135,13 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException {
         return getDelegate().updateIndexState(tableMetadata, parentTableName);
     }
-    
+
+    @Override public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata,
+            String parentTableName, Map<String, List<Pair<String, Object>>> stmtProperties,
+            PTable table) throws SQLException {
+        return getDelegate().updateIndexState(tableMetadata, parentTableName, stmtProperties,table);
+    }
+
     @Override
     public void init(String url, Properties props) throws SQLException {
         getDelegate().init(url, props);