You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/09/14 21:43:23 UTC

[25/50] [abbrv] phoenix git commit: PHOENIX-3148 Reduce size of PTable so that more tables can be cached in the metada cache

PHOENIX-3148 Reduce size of PTable so that more tables can be cached in the metada cache


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a0ae8025
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a0ae8025
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a0ae8025

Branch: refs/heads/calcite
Commit: a0ae8025eccbd76b4277f74e57081ddbb5a7babe
Parents: b36bb31
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Fri Aug 19 16:39:28 2016 -0700
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Tue Aug 23 23:00:45 2016 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/IndexToolIT.java |   1 -
 .../apache/phoenix/execute/MutationState.java   |   6 +-
 .../query/ConnectionQueryServicesImpl.java      |   5 +-
 .../query/ConnectionlessQueryServicesImpl.java  |   4 +-
 .../org/apache/phoenix/query/QueryServices.java |   2 +
 .../phoenix/query/QueryServicesOptions.java     |   3 +
 .../apache/phoenix/schema/MetaDataClient.java   |  25 +-
 .../apache/phoenix/schema/PMetaDataCache.java   | 221 +++++++++++++++
 .../apache/phoenix/schema/PMetaDataImpl.java    | 268 +++----------------
 .../org/apache/phoenix/schema/PTableImpl.java   |  29 +-
 .../org/apache/phoenix/schema/PTableRef.java    |  56 ++--
 .../apache/phoenix/schema/PTableRefFactory.java |  52 ++++
 .../apache/phoenix/schema/PTableRefImpl.java    |  39 +++
 .../phoenix/schema/SerializedPTableRef.java     |  47 ++++
 .../schema/SerializedPTableRefFactory.java      |  37 +++
 .../phoenix/schema/PMetaDataImplTest.java       |  34 ++-
 16 files changed, 532 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index c66fea3..16db876 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -37,7 +37,6 @@ import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.PropertiesUtil;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 7a9282c..e7e6aa7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -680,7 +680,7 @@ public class MutationState implements SQLCloseable {
             return Iterators.emptyIterator();
         }
         Long scn = connection.getSCN();
-        final long timestamp = (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn);
+        final long timestamp = getMutationTimestamp(tableTimestamp, scn);
         return new Iterator<Pair<byte[],List<Mutation>>>() {
             private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
             private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
@@ -726,6 +726,10 @@ public class MutationState implements SQLCloseable {
             
         };
     }
+
+    public static long getMutationTimestamp(final Long tableTimestamp, Long scn) {
+        return (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn);
+    }
         
     /**
      * Validates that the meta data is valid against the server meta data if we haven't yet done so.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 7a57103..524067d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -32,6 +32,7 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
+import static org.apache.phoenix.util.UpgradeUtil.getUpgradeSnapshotName;
 import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
@@ -288,9 +289,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             });
     
     private PMetaData newEmptyMetaData() {
-        long maxSizeBytes = props.getLong(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB,
-                QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE);
-        return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes));
+        return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps()));
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 25aca74..560b5d9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -144,9 +144,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     private PMetaData newEmptyMetaData() {
-        long maxSizeBytes = getProps().getLong(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB,
-                QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE);
-        return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes);
+        return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 22fa45a..d7c7c62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -223,6 +223,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String LIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.limited.query.serial.threshold";
 
     public static final String INDEX_ASYNC_BUILD_ENABLED = "phoenix.index.async.build.enabled";
+    
+    public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding";
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 83347c8..d874860 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.phoenix.schema.PTableRefFactory;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -255,6 +256,8 @@ public class QueryServicesOptions {
     public static final float DEFAULT_LIMITED_QUERY_SERIAL_THRESHOLD = 0.2f;
     
     public static final boolean DEFAULT_INDEX_ASYNC_BUILD_ENABLED = true;
+    
+    public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString();
 
     @SuppressWarnings("serial")
     public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 7f97f4a..efe60ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -3147,7 +3147,7 @@ public class MetaDataClient {
         }
     }
 
-    private String dropColumnMutations(PTable table, List<PColumn> columnsToDrop, List<Mutation> tableMetaData) throws SQLException {
+    private String dropColumnMutations(PTable table, List<PColumn> columnsToDrop) throws SQLException {
         String tenantId = connection.getTenantId() == null ? "" : connection.getTenantId().getString();
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
@@ -3263,7 +3263,9 @@ public class MetaDataClient {
                     columnsToDrop.add(new ColumnRef(columnRef.getTableRef(), columnToDrop.getPosition()));
                 }
 
-                dropColumnMutations(table, tableColumnsToDrop, tableMetaData);
+                dropColumnMutations(table, tableColumnsToDrop);
+                boolean removedIndexTableOrColumn=false;
+                Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null;
                 for (PTable index : table.getIndexes()) {
                     IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection);
                     // get the columns required for the index pk
@@ -3278,6 +3280,7 @@ public class MetaDataClient {
                             if (index.getViewIndexId()==null) 
                                 indexesToDrop.add(new TableRef(index));
                             connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, index.getName().getString()), index.getParentName() == null ? null : index.getParentName().getString(), index.getTimeStamp());
+                            removedIndexTableOrColumn = true;
                         } 
                         else if (coveredColumns.contains(columnToDropRef)) {
                             String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop);
@@ -3285,15 +3288,18 @@ public class MetaDataClient {
                             indexColumnsToDrop.add(indexColumn);
                             // add the index column to be dropped so that we actually delete the column values
                             columnsToDrop.add(new ColumnRef(new TableRef(index), indexColumn.getPosition()));
+                            removedIndexTableOrColumn = true;
                         }
                     }
                     if(!indexColumnsToDrop.isEmpty()) {
-                        incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null, null);
-                        dropColumnMutations(index, indexColumnsToDrop, tableMetaData);
+                        long indexTableSeqNum = incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null, null);
+                        dropColumnMutations(index, indexColumnsToDrop);
+                        long clientTimestamp = MutationState.getMutationTimestamp(timeStamp, connection.getSCN());
+                        connection.removeColumn(tenantId, index.getName().getString(),
+                            indexColumnsToDrop, clientTimestamp, indexTableSeqNum,
+                            TransactionUtil.getResolvedTimestamp(connection, index.isTransactional(), clientTimestamp));
                     }
-
                 }
-                Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null;
                 tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
 
@@ -3348,8 +3354,11 @@ public class MetaDataClient {
                     // If we've done any index metadata updates, don't bother trying to update
                     // client-side cache as it would be too painful. Just let it pull it over from
                     // the server when needed.
-                    if (tableColumnsToDrop.size() > 0 && indexesToDrop.isEmpty()) {
-                        connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, result));
+                    if (tableColumnsToDrop.size() > 0) {
+                        if (removedIndexTableOrColumn)
+                            connection.removeTable(tenantId, tableName, table.getParentName() == null ? null : table.getParentName().getString(), table.getTimeStamp());
+                        else  
+                            connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, result));
                     }
                     // If we have a VIEW, then only delete the metadata, and leave the table data alone
                     if (table.getType() != PTableType.VIEW) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataCache.java
new file mode 100644
index 0000000..9992adb
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataCache.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.parse.PSchema;
+import org.apache.phoenix.util.TimeKeeper;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.primitives.Longs;
+
+class PMetaDataCache implements Cloneable {
+    private static final int MIN_REMOVAL_SIZE = 3;
+    private static final Comparator<PTableRef> COMPARATOR = new Comparator<PTableRef>() {
+        @Override
+        public int compare(PTableRef tableRef1, PTableRef tableRef2) {
+            return Longs.compare(tableRef1.getLastAccessTime(), tableRef2.getLastAccessTime());
+        }
+    };
+    private static final MinMaxPriorityQueue.Builder<PTableRef> BUILDER = MinMaxPriorityQueue.orderedBy(COMPARATOR);
+    
+    private long currentByteSize;
+    private final long maxByteSize;
+    private final int expectedCapacity;
+    private final TimeKeeper timeKeeper;
+    private final PTableRefFactory tableRefFactory;
+
+    private final Map<PTableKey,PTableRef> tables;
+    final Map<PTableKey,PFunction> functions;
+    final Map<PTableKey,PSchema> schemas;
+    
+    private static Map<PTableKey,PTableRef> newMap(int expectedCapacity) {
+        // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time
+        // safely across multiple threads (as the underlying collection is not thread safe).
+        // Instead, we track access time and prune it based on the copy we've made.
+        return Maps.newHashMapWithExpectedSize(expectedCapacity);
+    }
+
+    private static Map<PTableKey,PFunction> newFunctionMap(int expectedCapacity) {
+        // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time
+        // safely across multiple threads (as the underlying collection is not thread safe).
+        // Instead, we track access time and prune it based on the copy we've made.
+        return Maps.newHashMapWithExpectedSize(expectedCapacity);
+    }
+
+    private static Map<PTableKey,PSchema> newSchemaMap(int expectedCapacity) {
+        // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time
+        // safely across multiple threads (as the underlying collection is not thread safe).
+        // Instead, we track access time and prune it based on the copy we've made.
+        return Maps.newHashMapWithExpectedSize(expectedCapacity);
+    }
+
+    private Map<PTableKey,PTableRef> cloneMap(Map<PTableKey,PTableRef> tables, int expectedCapacity) {
+        Map<PTableKey,PTableRef> newTables = newMap(Math.max(tables.size(),expectedCapacity));
+        // Copy value so that access time isn't changing anymore
+        for (PTableRef tableAccess : tables.values()) {
+            newTables.put(tableAccess.getTable().getKey(), tableRefFactory.makePTableRef(tableAccess));
+        }
+        return newTables;
+    }
+
+    private static Map<PTableKey, PSchema> cloneSchemaMap(Map<PTableKey, PSchema> schemas, int expectedCapacity) {
+        Map<PTableKey, PSchema> newSchemas = newSchemaMap(Math.max(schemas.size(), expectedCapacity));
+        // Copy value so that access time isn't changing anymore
+        for (PSchema schema : schemas.values()) {
+            newSchemas.put(schema.getSchemaKey(), new PSchema(schema));
+        }
+        return newSchemas;
+    }
+
+    private static Map<PTableKey,PFunction> cloneFunctionsMap(Map<PTableKey,PFunction> functions, int expectedCapacity) {
+        Map<PTableKey,PFunction> newFunctions = newFunctionMap(Math.max(functions.size(),expectedCapacity));
+        for (PFunction functionAccess : functions.values()) {
+            newFunctions.put(functionAccess.getKey(), new PFunction(functionAccess));
+        }
+        return newFunctions;
+    }
+
+    PMetaDataCache(PMetaDataCache toClone) {
+        this.tableRefFactory = toClone.tableRefFactory;
+        this.timeKeeper = toClone.timeKeeper;
+        this.maxByteSize = toClone.maxByteSize;
+        this.currentByteSize = toClone.currentByteSize;
+        this.expectedCapacity = toClone.expectedCapacity;
+        this.tables = cloneMap(toClone.tables, expectedCapacity);
+        this.functions = cloneFunctionsMap(toClone.functions, expectedCapacity);
+        this.schemas = cloneSchemaMap(toClone.schemas, expectedCapacity);
+    }
+    
+    public PMetaDataCache(int initialCapacity, long maxByteSize, TimeKeeper timeKeeper, PTableRefFactory tableRefFactory) {
+        this.currentByteSize = 0;
+        this.maxByteSize = maxByteSize;
+        this.expectedCapacity = initialCapacity;
+        this.tables = newMap(this.expectedCapacity);
+        this.functions = newFunctionMap(this.expectedCapacity);
+        this.timeKeeper = timeKeeper;
+        this.schemas = newSchemaMap(this.expectedCapacity);
+        this.tableRefFactory = tableRefFactory;
+    }
+    
+    public PTableRef get(PTableKey key) {
+        PTableRef tableAccess = this.tables.get(key);
+        if (tableAccess == null) {
+            return null;
+        }
+        tableAccess.setLastAccessTime(timeKeeper.getCurrentTime());
+        return tableAccess;
+    }
+    
+    @Override
+    public PMetaDataCache clone() {
+        return new PMetaDataCache(this);
+    }
+    
+    /**
+     * Used when the cache is growing past its max size to clone in a single pass.
+     * Removes least recently used tables to get size of cache below its max size by
+     * the overage amount.
+     */
+    public PMetaDataCache cloneMinusOverage(long overage) {
+        assert(overage > 0);
+        int nToRemove = Math.max(MIN_REMOVAL_SIZE, (int)Math.ceil((currentByteSize-maxByteSize) / ((double)currentByteSize / size())) + 1);
+        MinMaxPriorityQueue<PTableRef> toRemove = BUILDER.expectedSize(nToRemove).create();
+        PMetaDataCache newCache = new PMetaDataCache(this.size(), this.maxByteSize, this.timeKeeper, this.tableRefFactory);
+        
+        long toRemoveBytes = 0;
+        // Add to new cache, but track references to remove when done
+        // to bring cache at least overage amount below it's max size.
+        for (PTableRef tableRef : this.tables.values()) {
+            newCache.put(tableRef.getTable().getKey(), tableRefFactory.makePTableRef(tableRef));
+            toRemove.add(tableRef);
+            toRemoveBytes += tableRef.getEstimatedSize();
+            while (toRemoveBytes - toRemove.peekLast().getEstimatedSize() >= overage) {
+                PTableRef removedRef = toRemove.removeLast();
+                toRemoveBytes -= removedRef.getEstimatedSize();
+            }
+        }
+        for (PTableRef toRemoveRef : toRemove) {
+            newCache.remove(toRemoveRef.getTable().getKey());
+        }
+        return newCache;
+    }
+
+    PTable put(PTableKey key, PTableRef ref) {
+        currentByteSize += ref.getEstimatedSize();
+        PTableRef oldTableAccess = this.tables.put(key, ref);
+        PTable oldTable = null;
+        if (oldTableAccess != null) {
+            currentByteSize -= oldTableAccess.getEstimatedSize();
+            oldTable = oldTableAccess.getTable();
+        }
+        return oldTable;
+    }
+
+    public long getAge(PTableRef ref) {
+        return timeKeeper.getCurrentTime() - ref.getCreateTime();
+    }
+    
+    public PTable remove(PTableKey key) {
+        PTableRef value = this.tables.remove(key);
+        if (value == null) {
+            return null;
+        }
+        currentByteSize -= value.getEstimatedSize();
+        return value.getTable();
+    }
+    
+    public Iterator<PTable> iterator() {
+        final Iterator<PTableRef> iterator = this.tables.values().iterator();
+        return new Iterator<PTable>() {
+
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public PTable next() {
+                return iterator.next().getTable();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+            
+        };
+    }
+
+    public int size() {
+        return this.tables.size();
+    }
+
+    public long getCurrentSize() {
+        return this.currentByteSize;
+    }
+
+    public long getMaxSize() {
+        return this.maxByteSize;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 5ffacca..7a78006 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -18,253 +18,50 @@
 package org.apache.phoenix.schema;
 
 import java.sql.SQLException;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TimeKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.MinMaxPriorityQueue;
-import com.google.common.primitives.Longs;
 
 /**
- * 
- * Client-side cache of MetaData. Not thread safe, but meant to be used
- * in a copy-on-write fashion. Internally uses a LinkedHashMap that evicts
- * the oldest entries when size grows beyond the maxSize specified at
- * create time.
- *
+ * Client-side cache of MetaData, not thread safe. Internally uses a LinkedHashMap that evicts the
+ * oldest entries when size grows beyond the maxSize specified at create time.
  */
 public class PMetaDataImpl implements PMetaData {
-    private static final Logger logger = LoggerFactory.getLogger(PMetaDataImpl.class);
-        static class PMetaDataCache implements Cloneable {
-            private static final int MIN_REMOVAL_SIZE = 3;
-            private static final Comparator<PTableRef> COMPARATOR = new Comparator<PTableRef>() {
-                @Override
-                public int compare(PTableRef tableRef1, PTableRef tableRef2) {
-                    return Longs.compare(tableRef1.getLastAccessTime(), tableRef2.getLastAccessTime());
-                }
-            };
-            private static final MinMaxPriorityQueue.Builder<PTableRef> BUILDER = MinMaxPriorityQueue.orderedBy(COMPARATOR);
-            
-            private long currentByteSize;
-            private final long maxByteSize;
-            private final int expectedCapacity;
-            private final TimeKeeper timeKeeper;
-
-            private final Map<PTableKey,PTableRef> tables;
-            private final Map<PTableKey,PFunction> functions;
-            private final Map<PTableKey,PSchema> schemas;
-            
-            private static Map<PTableKey,PTableRef> newMap(int expectedCapacity) {
-                // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time
-                // safely across multiple threads (as the underlying collection is not thread safe).
-                // Instead, we track access time and prune it based on the copy we've made.
-                return Maps.newHashMapWithExpectedSize(expectedCapacity);
-            }
-
-            private static Map<PTableKey,PFunction> newFunctionMap(int expectedCapacity) {
-                // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time
-                // safely across multiple threads (as the underlying collection is not thread safe).
-                // Instead, we track access time and prune it based on the copy we've made.
-                return Maps.newHashMapWithExpectedSize(expectedCapacity);
-            }
-
-            private static Map<PTableKey,PSchema> newSchemaMap(int expectedCapacity) {
-                // Use regular HashMap, as we cannot use a LinkedHashMap that orders by access time
-                // safely across multiple threads (as the underlying collection is not thread safe).
-                // Instead, we track access time and prune it based on the copy we've made.
-                return Maps.newHashMapWithExpectedSize(expectedCapacity);
-            }
-
-            private static Map<PTableKey,PTableRef> cloneMap(Map<PTableKey,PTableRef> tables, int expectedCapacity) {
-                Map<PTableKey,PTableRef> newTables = newMap(Math.max(tables.size(),expectedCapacity));
-                // Copy value so that access time isn't changing anymore
-                for (PTableRef tableAccess : tables.values()) {
-                    newTables.put(tableAccess.getTable().getKey(), new PTableRef(tableAccess));
-                }
-                return newTables;
-            }
-
-            private static Map<PTableKey, PSchema> cloneSchemaMap(Map<PTableKey, PSchema> schemas, int expectedCapacity) {
-                Map<PTableKey, PSchema> newSchemas = newSchemaMap(Math.max(schemas.size(), expectedCapacity));
-                // Copy value so that access time isn't changing anymore
-                for (PSchema schema : schemas.values()) {
-                    newSchemas.put(schema.getSchemaKey(), new PSchema(schema));
-                }
-                return newSchemas;
-            }
-
-            private static Map<PTableKey,PFunction> cloneFunctionsMap(Map<PTableKey,PFunction> functions, int expectedCapacity) {
-                Map<PTableKey,PFunction> newFunctions = newFunctionMap(Math.max(functions.size(),expectedCapacity));
-                for (PFunction functionAccess : functions.values()) {
-                    newFunctions.put(functionAccess.getKey(), new PFunction(functionAccess));
-                }
-                return newFunctions;
-            }
-
-            private PMetaDataCache(PMetaDataCache toClone) {
-                this.timeKeeper = toClone.timeKeeper;
-                this.maxByteSize = toClone.maxByteSize;
-                this.currentByteSize = toClone.currentByteSize;
-                this.expectedCapacity = toClone.expectedCapacity;
-                this.tables = cloneMap(toClone.tables, expectedCapacity);
-                this.functions = cloneFunctionsMap(toClone.functions, expectedCapacity);
-                this.schemas = cloneSchemaMap(toClone.schemas, expectedCapacity);
-            }
-            
-            public PMetaDataCache(int initialCapacity, long maxByteSize, TimeKeeper timeKeeper) {
-                this.currentByteSize = 0;
-                this.maxByteSize = maxByteSize;
-                this.expectedCapacity = initialCapacity;
-                this.tables = newMap(this.expectedCapacity);
-                this.functions = newFunctionMap(this.expectedCapacity);
-                this.timeKeeper = timeKeeper;
-                this.schemas = newSchemaMap(this.expectedCapacity);
-            }
-            
-            public PTableRef get(PTableKey key) {
-                PTableRef tableAccess = this.tables.get(key);
-                if (tableAccess == null) {
-                    return null;
-                }
-                tableAccess.setLastAccessTime(timeKeeper.getCurrentTime());
-                return tableAccess;
-            }
-            
-            @Override
-            public PMetaDataCache clone() {
-                return new PMetaDataCache(this);
-            }
-
-            /**
-             * Used when the cache is growing past its max size to clone in a single pass.
-             * Removes least recently used tables to get size of cache below its max size by
-             * the overage amount.
-             */
-            public PMetaDataCache cloneMinusOverage(long overage) {
-                assert(overage > 0);
-                int nToRemove = Math.max(MIN_REMOVAL_SIZE, (int)Math.ceil((currentByteSize-maxByteSize) / ((double)currentByteSize / size())) + 1);
-                MinMaxPriorityQueue<PTableRef> toRemove = BUILDER.expectedSize(nToRemove).create();
-                PMetaDataCache newCache = new PMetaDataCache(this.size(), this.maxByteSize, this.timeKeeper);
-                
-                long toRemoveBytes = 0;
-                // Add to new cache, but track references to remove when done
-                // to bring cache at least overage amount below it's max size.
-                for (PTableRef tableRef : this.tables.values()) {
-                    newCache.put(tableRef.getTable().getKey(), new PTableRef(tableRef));
-                    toRemove.add(tableRef);
-                    toRemoveBytes += tableRef.getEstSize();
-                    while (toRemoveBytes - toRemove.peekLast().getEstSize() >= overage) {
-                        PTableRef removedRef = toRemove.removeLast();
-                        toRemoveBytes -= removedRef.getEstSize();
-                    }
-                }
-                for (PTableRef toRemoveRef : toRemove) {
-                    newCache.remove(toRemoveRef.getTable().getKey());
-                }
-                return newCache;
-            }
-
-            private PTable put(PTableKey key, PTableRef ref) {
-                currentByteSize += ref.getEstSize();
-                PTableRef oldTableAccess = this.tables.put(key, ref);
-                PTable oldTable = null;
-                if (oldTableAccess != null) {
-                    currentByteSize -= oldTableAccess.getEstSize();
-                    oldTable = oldTableAccess.getTable();
-                }
-                return oldTable;
-            }
-
-            public PTable put(PTableKey key, PTable value, long resolvedTime) {
-                return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), resolvedTime));
-            }
-            
-            public PTable putDuplicate(PTableKey key, PTable value, long resolvedTime) {
-                return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0, resolvedTime));
-            }
-            
-            public long getAge(PTableRef ref) {
-                return timeKeeper.getCurrentTime() - ref.getCreateTime();
-            }
-            
-            public PTable remove(PTableKey key) {
-                PTableRef value = this.tables.remove(key);
-                if (value == null) {
-                    return null;
-                }
-                currentByteSize -= value.getEstSize();
-                return value.getTable();
-            }
-            
-            public Iterator<PTable> iterator() {
-                final Iterator<PTableRef> iterator = this.tables.values().iterator();
-                return new Iterator<PTable>() {
-
-                    @Override
-                    public boolean hasNext() {
-                        return iterator.hasNext();
-                    }
-
-                    @Override
-                    public PTable next() {
-                        return iterator.next().getTable();
-                    }
-
-                    @Override
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                    
-                };
-            }
-
-            public int size() {
-                return this.tables.size();
-            }
-
-            public long getCurrentSize() {
-                return this.currentByteSize;
-            }
-
-            public long getMaxSize() {
-                return this.maxByteSize;
-            }
-        }
-            
-    private PMetaDataCache metaData;
     
-    @VisibleForTesting
-    public PMetaDataCache getMetaData() {
-        return metaData;
-    }
+    private PMetaDataCache metaData;
+    private final TimeKeeper timeKeeper;
+    private final PTableRefFactory tableRefFactory;
     
-    public PMetaDataImpl(int initialCapacity, long maxByteSize) {
-        this.metaData = new PMetaDataCache(initialCapacity, maxByteSize, TimeKeeper.SYSTEM);
+    public PMetaDataImpl(int initialCapacity, ReadOnlyProps props) {
+        this(initialCapacity, TimeKeeper.SYSTEM, props);
     }
 
-    public PMetaDataImpl(int initialCapacity, long maxByteSize, TimeKeeper timeKeeper) {
-        this.metaData = new PMetaDataCache(initialCapacity, maxByteSize, timeKeeper);
+    public PMetaDataImpl(int initialCapacity, TimeKeeper timeKeeper, ReadOnlyProps props) {
+        this(new PMetaDataCache(initialCapacity, props.getLong(
+            QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB,
+            QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE), timeKeeper,
+                PTableRefFactory.getFactory(props)), timeKeeper, PTableRefFactory.getFactory(props));
     }
 
-    private PMetaDataImpl(PMetaDataCache metaData) {
+    private PMetaDataImpl(PMetaDataCache metaData, TimeKeeper timeKeeper, PTableRefFactory tableRefFactory) {
+        this.timeKeeper = timeKeeper;
         this.metaData = metaData;
+        this.tableRefFactory = tableRefFactory;
     }
-    
+
     @Override
     public PMetaDataImpl clone() {
-        return new PMetaDataImpl(new PMetaDataCache(this.metaData));
+        return new PMetaDataImpl(new PMetaDataCache(this.metaData), this.timeKeeper, this.tableRefFactory);
     }
     
     @Override
@@ -292,18 +89,20 @@ public class PMetaDataImpl implements PMetaData {
 
     @Override
     public void updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException {
-    	metaData.putDuplicate(table.getKey(), table, resolvedTimestamp);
+    	metaData.put(table.getKey(), tableRefFactory.makePTableRef(table, this.timeKeeper.getCurrentTime(), resolvedTimestamp));
     }
 
     @Override
     public void addTable(PTable table, long resolvedTime) throws SQLException {
+        PTableRef tableRef = tableRefFactory.makePTableRef(table, this.timeKeeper.getCurrentTime(), resolvedTime);
         int netGain = 0;
         PTableKey key = table.getKey();
         PTableRef oldTableRef = metaData.get(key);
         if (oldTableRef != null) {
-            netGain -= oldTableRef.getEstSize();
+            netGain -= oldTableRef.getEstimatedSize();
         }
         PTable newParentTable = null;
+        PTableRef newParentTableRef = null;
         long parentResolvedTimestamp = resolvedTime;
         if (table.getParentName() != null) { // Upsert new index table into parent data table list
             String parentName = table.getParentName().getString();
@@ -321,25 +120,26 @@ public class PMetaDataImpl implements PMetaData {
                     }
                 }
                 newIndexes.add(table);
-                netGain -= oldParentRef.getEstSize();
+                netGain -= oldParentRef.getEstimatedSize();
                 newParentTable = PTableImpl.makePTable(oldParentRef.getTable(), table.getTimeStamp(), newIndexes);
-                netGain += newParentTable.getEstimatedSize();
+                newParentTableRef = tableRefFactory.makePTableRef(newParentTable, this.timeKeeper.getCurrentTime(), parentResolvedTimestamp);
+                netGain += newParentTableRef.getEstimatedSize();
             }
         }
         if (newParentTable == null) { // Don't count in gain if we found a parent table, as its accounted for in newParentTable
-            netGain += table.getEstimatedSize();
+            netGain += tableRef.getEstimatedSize();
         }
         long overage = metaData.getCurrentSize() + netGain - metaData.getMaxSize();
         metaData = overage <= 0 ? metaData : metaData.cloneMinusOverage(overage);
         
         if (newParentTable != null) { // Upsert new index table into parent data table list
-            metaData.put(newParentTable.getKey(), newParentTable, parentResolvedTimestamp);
-            metaData.putDuplicate(table.getKey(), table, resolvedTime);
+            metaData.put(newParentTable.getKey(), newParentTableRef);
+            metaData.put(table.getKey(), tableRef);
         } else {
-            metaData.put(table.getKey(), table, resolvedTime);
+            metaData.put(table.getKey(), tableRef);
         }
         for (PTable index : table.getIndexes()) {
-            metaData.putDuplicate(index.getKey(), index, resolvedTime);
+            metaData.put(index.getKey(), tableRefFactory.makePTableRef(index, this.timeKeeper.getCurrentTime(), resolvedTime));
         }
     }
 
@@ -401,7 +201,7 @@ public class PMetaDataImpl implements PMetaData {
                                 parentTableRef.getTable(),
                                 tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.getTable().getTimeStamp() : tableTimeStamp,
                                 newIndexes);
-                        metaData.put(parentTable.getKey(), parentTable, parentTableRef.getResolvedTimeStamp());
+                        metaData.put(parentTable.getKey(), tableRefFactory.makePTableRef(parentTable, this.timeKeeper.getCurrentTime(), parentTableRef.getResolvedTimeStamp()));
                         break;
                     }
                 }
@@ -444,7 +244,7 @@ public class PMetaDataImpl implements PMetaData {
             
             table = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, columns);
         }
-        tables.put(table.getKey(), table, resolvedTime);
+        tables.put(table.getKey(), tableRefFactory.makePTableRef(table, this.timeKeeper.getCurrentTime(), resolvedTime));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 92c49f9..c485a30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -77,7 +77,6 @@ import com.google.common.collect.Maps;
  * storing data in a single column (ColumnLayout.SINGLE) or in
  * multiple columns (ColumnLayout.MULTI).
  *
- * TODO add hashCode and equal methods to check equality of two PTableImpl objects.
  * @since 0.1
  */
 public class PTableImpl implements PTable {
@@ -1073,9 +1072,9 @@ public class PTableImpl implements PTable {
       List<PName> physicalNames = Collections.emptyList();
       if (tableType == PTableType.VIEW) {
         viewType = ViewType.fromSerializedValue(table.getViewType().toByteArray()[0]);
-        if(table.hasViewStatement()){
-          viewStatement = (String) PVarchar.INSTANCE.toObject(table.getViewStatement().toByteArray());
-        }
+      }
+      if(table.hasViewStatement()){
+        viewStatement = (String) PVarchar.INSTANCE.toObject(table.getViewStatement().toByteArray());
       }
       if (tableType == PTableType.VIEW || viewIndexId != null) {
         physicalNames = Lists.newArrayListWithExpectedSize(table.getPhysicalNamesCount());
@@ -1181,6 +1180,8 @@ public class PTableImpl implements PTable {
       builder.setTransactional(table.isTransactional());
       if(table.getType() == PTableType.VIEW){
         builder.setViewType(ByteStringer.wrap(new byte[]{table.getViewType().getSerializedValue()}));
+      }
+      if(table.getViewStatement()!=null){
         builder.setViewStatement(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getViewStatement())));
       }
       if(table.getType() == PTableType.VIEW || table.getViewIndexId() != null){
@@ -1244,4 +1245,24 @@ public class PTableImpl implements PTable {
     public boolean isAppendOnlySchema() {
         return isAppendOnlySchema;
     }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((key == null) ? 0 : key.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        PTableImpl other = (PTableImpl) obj;
+        if (key == null) {
+            if (other.key != null) return false;
+        } else if (!key.equals(other.key)) return false;
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
index c4bc510..0a601b0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
@@ -17,28 +17,19 @@
  */
 package org.apache.phoenix.schema;
 
-public class PTableRef {
-    private final PTable table;
-    private final int estSize;
-    private final long createTime;
-    private final long resolvedTimeStamp;
-	private volatile long lastAccessTime;
+public abstract class PTableRef {
     
-    public PTableRef(PTable table, long lastAccessTime, int estSize, long resolvedTime) {
-        this.table = table;
+    protected final int estSize;
+    protected final long createTime;
+    protected final long resolvedTimeStamp;
+    protected volatile long lastAccessTime;
+    
+    public PTableRef(long lastAccessTime, long resolvedTime, int estimatedSize) {
         this.lastAccessTime = lastAccessTime;
-        this.estSize = estSize;
+        this.estSize = estimatedSize;
         this.resolvedTimeStamp = resolvedTime;
         this.createTime = lastAccessTime;
     }
-
-    public PTableRef(PTable table, long lastAccessTime, long resolvedTime) {
-        this (table, lastAccessTime, table.getEstimatedSize(), resolvedTime);
-    }
-
-    public PTableRef(PTableRef tableRef) {
-        this (tableRef.table, tableRef.lastAccessTime, tableRef.estSize, tableRef.resolvedTimeStamp);
-    }
     
     /**
      * Tracks how long this entry has been in the cache
@@ -48,23 +39,22 @@ public class PTableRef {
         return createTime;
     }
     
-    public PTable getTable() {
-		return table;
-	}
+    public abstract PTable getTable();
 
-	public long getResolvedTimeStamp() {
-		return resolvedTimeStamp;
-	}
-	
-    public int getEstSize() {
-		return estSize;
-	}
+    public long getResolvedTimeStamp() {
+        return resolvedTimeStamp;
+    }
+    
+    public int getEstimatedSize() {
+        return estSize;
+    }
 
-	public long getLastAccessTime() {
-		return lastAccessTime;
-	}
+    public long getLastAccessTime() {
+        return lastAccessTime;
+    }
 
-	public void setLastAccessTime(long lastAccessTime) {
-		this.lastAccessTime = lastAccessTime;
-	}
+    public void setLastAccessTime(long lastAccessTime) {
+        this.lastAccessTime = lastAccessTime;
+    }
+	
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefFactory.java
new file mode 100644
index 0000000..14eb235
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+public class PTableRefFactory {
+    public PTableRef makePTableRef(PTable table, long lastAccessTime, long resolvedTime) {
+        return new PTableRefImpl(table, lastAccessTime, resolvedTime, table.getEstimatedSize());
+    }
+
+    public PTableRef makePTableRef(PTableRef tableRef) {
+        return new PTableRefImpl(tableRef);
+    }
+
+    private static final PTableRefFactory INSTANCE = new PTableRefFactory();
+
+    public static enum Encoding {
+        OBJECT, PROTOBUF
+    };
+
+    public static PTableRefFactory getFactory(ReadOnlyProps props) {
+        String encodingEnumString =
+                props.get(QueryServices.CLIENT_CACHE_ENCODING,
+                    QueryServicesOptions.DEFAULT_CLIENT_CACHE_ENCODING);
+        Encoding encoding = Encoding.valueOf(encodingEnumString.toUpperCase());
+        switch (encoding) {
+        case PROTOBUF:
+            return SerializedPTableRefFactory.getFactory();
+        case OBJECT:
+        default:
+            return INSTANCE;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefImpl.java
new file mode 100644
index 0000000..ffc5c2b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRefImpl.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+
+public class PTableRefImpl extends PTableRef {
+    
+    private final PTable table;
+    
+    public PTableRefImpl(PTable table, long lastAccessTime, long resolvedTime, int estimatedSize) {
+        super(lastAccessTime, resolvedTime, estimatedSize);
+        this.table = table;
+    }
+
+    public PTableRefImpl(PTableRef tableRef) {
+        super(tableRef.getLastAccessTime(), tableRef.getResolvedTimeStamp(), tableRef.getEstimatedSize());
+        this.table = tableRef.getTable();
+    }
+
+    @Override
+    public PTable getTable() {
+        return table;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRef.java
new file mode 100644
index 0000000..a57fc72
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRef.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.io.IOException;
+
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
+
+public class SerializedPTableRef extends PTableRef {
+
+    private final byte[] tableBytes;
+
+    public SerializedPTableRef(byte[] tableBytes, long lastAccessTime, long resolvedTime, int estimatedSize) {
+        super(lastAccessTime, resolvedTime, tableBytes.length);
+        this.tableBytes = tableBytes;
+    }
+
+    public SerializedPTableRef(PTableRef tableRef) {
+        super(tableRef.getLastAccessTime(), tableRef.getResolvedTimeStamp(), tableRef.getEstimatedSize());
+        this.tableBytes = ((SerializedPTableRef)tableRef).tableBytes;
+    }
+
+    @Override
+    public PTable getTable() {
+        try {
+            return PTableImpl.createFromProto(PTableProtos.PTable.parseFrom(tableBytes));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRefFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRefFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRefFactory.java
new file mode 100644
index 0000000..5da1fd6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SerializedPTableRefFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+class SerializedPTableRefFactory extends PTableRefFactory {
+    @Override
+    public PTableRef makePTableRef(PTable table, long lastAccessTime, long resolvedTime) {
+        byte[] serializedBytes = PTableImpl.toProto(table).toByteArray();
+        return new SerializedPTableRef(serializedBytes, lastAccessTime, resolvedTime, table.getEstimatedSize());
+    }
+    
+    @Override
+    public PTableRef makePTableRef(PTableRef tableRef) {
+        return new SerializedPTableRef(tableRef);
+    }
+    
+    private static final SerializedPTableRefFactory INSTANCE = new SerializedPTableRefFactory();
+    
+    public static PTableRefFactory getFactory() {
+        return INSTANCE;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0ae8025/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
index ef88c8c..a5660db 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
@@ -21,12 +21,16 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import java.sql.SQLException;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TimeKeeper;
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class PMetaDataImplTest {
@@ -72,9 +76,11 @@ public class PMetaDataImplTest {
     
     @Test
     public void testEviction() throws Exception {
-        long maxSize = 10;
         TestTimeKeeper timeKeeper = new TestTimeKeeper();
-        PMetaData metaData = new PMetaDataImpl(5, maxSize, timeKeeper);
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "10");
+        props.put(QueryServices.CLIENT_CACHE_ENCODING, "object");
+        PMetaData metaData = new PMetaDataImpl(5, timeKeeper,  new ReadOnlyProps(props));
         addToTable(metaData, "a", 5, timeKeeper);
         assertEquals(1, metaData.size());
         addToTable(metaData, "b", 4, timeKeeper);
@@ -116,9 +122,11 @@ public class PMetaDataImplTest {
 
     @Test
     public void shouldNotEvictMoreEntriesThanNecessary() throws Exception {
-        long maxSize = 5;
         TestTimeKeeper timeKeeper = new TestTimeKeeper();
-        PMetaData metaData = new PMetaDataImpl(5, maxSize, timeKeeper);
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "5");
+        props.put(QueryServices.CLIENT_CACHE_ENCODING, "object");
+        PMetaData metaData = new PMetaDataImpl(5, timeKeeper,  new ReadOnlyProps(props));
         addToTable(metaData, "a", 1, timeKeeper);
         assertEquals(1, metaData.size());
         addToTable(metaData, "b", 1, timeKeeper);
@@ -136,9 +144,11 @@ public class PMetaDataImplTest {
 
     @Test
     public void shouldAlwaysKeepAtLeastOneEntryEvenIfTooLarge() throws Exception {
-        long maxSize = 5;
         TestTimeKeeper timeKeeper = new TestTimeKeeper();
-        PMetaData metaData = new PMetaDataImpl(5, maxSize, timeKeeper);
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "5");
+        props.put(QueryServices.CLIENT_CACHE_ENCODING, "object");
+        PMetaData metaData = new PMetaDataImpl(5, timeKeeper,  new ReadOnlyProps(props));
         addToTable(metaData, "a", 1, timeKeeper);
         assertEquals(1, metaData.size());
         addToTable(metaData, "b", 1, timeKeeper);
@@ -157,9 +167,11 @@ public class PMetaDataImplTest {
 
     @Test
     public void shouldAlwaysKeepOneEntryIfMaxSizeIsZero() throws Exception {
-        long maxSize = 0;
         TestTimeKeeper timeKeeper = new TestTimeKeeper();
-        PMetaData metaData = new PMetaDataImpl(0, maxSize, timeKeeper);
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "0");
+        props.put(QueryServices.CLIENT_CACHE_ENCODING, "object");
+        PMetaData metaData = new PMetaDataImpl(5, timeKeeper,  new ReadOnlyProps(props));
         addToTable(metaData, "a", 1, timeKeeper);
         assertEquals(1, metaData.size());
         addToTable(metaData, "b", 1, timeKeeper);
@@ -178,9 +190,11 @@ public class PMetaDataImplTest {
 
     @Test
     public void testAge() throws Exception {
-        long maxSize = 10;
         TestTimeKeeper timeKeeper = new TestTimeKeeper();
-        PMetaData metaData = new PMetaDataImpl(5, maxSize, timeKeeper);
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, "10");
+        props.put(QueryServices.CLIENT_CACHE_ENCODING, "object");
+        PMetaData metaData = new PMetaDataImpl(5, timeKeeper,  new ReadOnlyProps(props));
         String tableName = "a";
         addToTable(metaData, tableName, 1, timeKeeper);
         PTableRef aTableRef = metaData.getTableRef(new PTableKey(null,tableName));