You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:51 UTC

[35/51] [partial] Initial commit of master branch from github

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
new file mode 100644
index 0000000..9f01fff
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+
+
+/**
+ * 
+ * Coprocessor protocol for Phoenix DDL. Phoenix stores the table metadata in
+ * an HBase table named SYSTEM.TABLE. Each table is represented by:
+ * - one row for the table
+ * - one row per column in the tabe
+ * Upto {@link #DEFAULT_MAX_META_DATA_VERSIONS} versions are kept. The time
+ * stamp of the metadata must always be increasing. The timestamp of the key
+ * values in the data row corresponds to the schema that it's using.
+ *
+ * TODO: dynamically prune number of schema version kept based on whether or
+ * not the data table still uses it (based on the min time stamp of the data
+ * table).
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface MetaDataProtocol extends CoprocessorProtocol {
+    public static final int PHOENIX_MAJOR_VERSION = 3;
+    public static final int PHOENIX_MINOR_VERSION = 0;
+    public static final int PHOENIX_PATCH_NUMBER = 0;
+    public static final int PHOENIX_VERSION = 
+            MetaDataUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
+    
+    public static final long MIN_TABLE_TIMESTAMP = 0;
+    // Increase MIN_SYSTEM_TABLE_TIMESTAMP by one for each schema change SYSTEM.TABLE schema changes.
+    // For 1.0,1.1,1.2,and 1.2.1 we used MetaDataProtocol.MIN_TABLE_TIMESTAMP+1
+    // For 2.0 and above, we use MetaDataProtocol.MIN_TABLE_TIMESTAMP+7 so that we can add the five new
+    // columns to the existing system table (three new columns in 1.2.1 and two new columns in 1.2)
+    // For 3.0 and above, we use MIN_TABLE_TIMESTAMP + 8 so that we can add the tenant_id column
+    // as the first column to the existing system table.
+    // For 3.1 (SNAPSHOT) and above, we use MIN_TABLE_TIMESTAMP + 9 so that we can add
+    // the multi_tenant and multi_type columns for multi tenancy
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 9;
+    public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
+
+    // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
+    // a different code for every type of error.
+    // ENTITY_ALREADY_EXISTS, ENTITY_NOT_FOUND, NEWER_ENTITY_FOUND, ENTITY_NOT_IN_REGION, CONCURRENT_MODIFICATION
+    // ILLEGAL_MUTATION (+ sql code)
+    public enum MutationCode {
+        TABLE_ALREADY_EXISTS,
+        TABLE_NOT_FOUND, 
+        COLUMN_NOT_FOUND, 
+        COLUMN_ALREADY_EXISTS,
+        CONCURRENT_TABLE_MUTATION,
+        TABLE_NOT_IN_REGION,
+        NEWER_TABLE_FOUND,
+        UNALLOWED_TABLE_MUTATION,
+        NO_PK_COLUMNS,
+        PARENT_TABLE_NOT_FOUND
+    };
+    
+    public static class MetaDataMutationResult implements Writable {
+        private MutationCode returnCode;
+        private long mutationTime;
+        private PTable table;
+        private List<byte[]> tableNamesToDelete;
+        private byte[] columnName;
+        private byte[] familyName;
+        private boolean wasUpdated;
+        
+        public MetaDataMutationResult() {
+        }
+
+        public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, PColumn column) {
+            this(returnCode, currentTime, table);
+            if(column != null){
+                this.columnName = column.getName().getBytes();
+                this.familyName = column.getFamilyName().getBytes();    
+            }
+            
+        }
+        
+        public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table) {
+           this(returnCode, currentTime, table, Collections.<byte[]> emptyList());
+        }
+        
+        public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, List<byte[]> tableNamesToDelete) {
+            this.returnCode = returnCode;
+            this.mutationTime = currentTime;
+            this.table = table;
+            this.tableNamesToDelete = tableNamesToDelete;
+        }
+        
+        public MutationCode getMutationCode() {
+            return returnCode;
+        }
+        
+        public long getMutationTime() {
+            return mutationTime;
+        }
+        
+        public boolean wasUpdated() {
+            return wasUpdated;
+        }
+        
+        public PTable getTable() {
+            return table;
+        }
+ 
+        public void setTable(PTable table) {
+            this.table = table;
+        }
+ 
+        public List<byte[]> getTableNamesToDelete() {
+            return tableNamesToDelete;
+        }
+        
+        public byte[] getColumnName() {
+            return columnName;
+        }
+        
+        public byte[] getFamilyName() {
+            return familyName;
+        }        
+        
+        @Override
+        public void readFields(DataInput input) throws IOException {
+            this.returnCode = MutationCode.values()[WritableUtils.readVInt(input)];
+            this.mutationTime = input.readLong();
+            wasUpdated = input.readBoolean();
+            if (wasUpdated) {
+                this.table = new PTableImpl();
+                this.table.readFields(input);
+            }
+            columnName = Bytes.readByteArray(input);
+            if (columnName.length > 0) {
+                familyName = Bytes.readByteArray(input);
+            }
+            boolean hasTablesToDelete = input.readBoolean();
+            if (hasTablesToDelete) {
+                int count = input.readInt();
+                tableNamesToDelete = Lists.newArrayListWithExpectedSize(count);
+                for( int i = 0 ; i < count ; i++ ){
+                     byte[] tableName = Bytes.readByteArray(input);
+                     tableNamesToDelete.add(tableName);
+                }
+            }
+        }
+
+        @Override
+        public void write(DataOutput output) throws IOException {
+            WritableUtils.writeVInt(output, returnCode.ordinal());
+            output.writeLong(mutationTime);
+            output.writeBoolean(table != null);
+            if (table != null) {
+                table.write(output);
+            }
+            Bytes.writeByteArray(output, columnName == null ? ByteUtil.EMPTY_BYTE_ARRAY : columnName);
+            if (columnName != null) {
+                 Bytes.writeByteArray(output, familyName == null ? ByteUtil.EMPTY_BYTE_ARRAY : familyName);
+            }
+            if(tableNamesToDelete != null && tableNamesToDelete.size() > 0 ) {
+                output.writeBoolean(true);
+                output.writeInt(tableNamesToDelete.size());
+                for(byte[] tableName : tableNamesToDelete) {
+                    Bytes.writeByteArray(output,tableName);    
+                }
+                
+            } else {
+                output.writeBoolean(false);
+            }
+            
+        }
+    }
+    
+    /**
+     * The the latest Phoenix table at or before the given clientTimestamp. If the
+     * client already has the latest (based on the tableTimestamp), then no table
+     * is returned.
+     * @param tenantId
+     * @param schemaName
+     * @param tableName
+     * @param tableTimestamp
+     * @param clientTimestamp
+     * @return MetaDataMutationResult
+     * @throws IOException
+     */
+    MetaDataMutationResult getTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long tableTimestamp, long clientTimestamp) throws IOException;
+
+    /**
+     * Create a new Phoenix table
+     * @param tableMetadata
+     * @return MetaDataMutationResult
+     * @throws IOException
+     */
+    MetaDataMutationResult createTable(List<Mutation> tableMetadata) throws IOException;
+
+    /**
+     * Drop an existing Phoenix table
+     * @param tableMetadata
+     * @param tableType
+     * @return MetaDataMutationResult
+     * @throws IOException
+     */
+    MetaDataMutationResult dropTable(List<Mutation> tableMetadata, String tableType) throws IOException;
+
+    /**
+     * Add a column to an existing Phoenix table
+     * @param tableMetadata
+     * @return MetaDataMutationResult
+     * @throws IOException
+     */
+    MetaDataMutationResult addColumn(List<Mutation> tableMetadata) throws IOException;
+    
+    /**
+     * Drop a column from an existing Phoenix table
+     * @param tableMetadata
+     * @return MetaDataMutationResult
+     * @throws IOException
+     */
+    MetaDataMutationResult dropColumn(List<Mutation> tableMetadata) throws IOException;
+    
+    MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata) throws IOException;
+
+    /**
+     * Clears the server-side cache of table meta data. Used between test runs to
+     * ensure no side effects.
+     */
+    void clearCache();
+    
+    /**
+     * Get the version of the server-side HBase and phoenix.jar. Used when initially connecting
+     * to a cluster to ensure that the client and server jars are compatible.
+     */
+    long getVersion();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
new file mode 100644
index 0000000..d0bd7c5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import org.apache.phoenix.cache.GlobalCache;
+
+
+/**
+ * Coprocessor for metadata related operations. This coprocessor would only be registered
+ * to SYSTEM.TABLE.
+ */
+public class MetaDataRegionObserver extends BaseRegionObserver {
+
+    @Override
+    public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
+            boolean abortRequested) {
+        GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanProjector.java
new file mode 100644
index 0000000..ee9e6e2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanProjector.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+
+public class ScanProjector {
+    
+    public enum ProjectionType {TABLE, CF, CQ};
+    
+    private static final String SCAN_PROJECTOR = "scanProjector";
+    private static final byte[] SEPERATOR = Bytes.toBytes(":");
+    
+    private final ProjectionType type;
+    private final byte[] tablePrefix;
+    private final Map<ImmutableBytesPtr, byte[]> cfProjectionMap;
+    private final Map<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> cqProjectionMap;
+    
+    private ScanProjector(ProjectionType type, byte[] tablePrefix, 
+            Map<ImmutableBytesPtr, byte[]> cfProjectionMap, Map<ImmutableBytesPtr, 
+            Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> cqProjectionMap) {
+        this.type = ProjectionType.TABLE;
+        this.tablePrefix = tablePrefix;
+        this.cfProjectionMap = cfProjectionMap;
+        this.cqProjectionMap = cqProjectionMap;
+    }
+    
+    public static void serializeProjectorIntoScan(Scan scan, ScanProjector projector) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            WritableUtils.writeVInt(output, projector.type.ordinal());
+            switch (projector.type) {
+            case TABLE:
+                WritableUtils.writeCompressedByteArray(output, projector.tablePrefix);
+                break;
+            case CF:
+                WritableUtils.writeVInt(output, projector.cfProjectionMap.size());
+                for (Map.Entry<ImmutableBytesPtr, byte[]> entry : projector.cfProjectionMap.entrySet()) {
+                    WritableUtils.writeCompressedByteArray(output, entry.getKey().get());
+                    WritableUtils.writeCompressedByteArray(output, entry.getValue());
+                }
+                break;
+            case CQ:
+                WritableUtils.writeVInt(output, projector.cqProjectionMap.size());
+                for (Map.Entry<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> entry : 
+                    projector.cqProjectionMap.entrySet()) {
+                    WritableUtils.writeCompressedByteArray(output, entry.getKey().get());
+                    Map<ImmutableBytesPtr, Pair<byte[], byte[]>> map = entry.getValue();
+                    WritableUtils.writeVInt(output, map.size());
+                    for (Map.Entry<ImmutableBytesPtr, Pair<byte[], byte[]>> e : map.entrySet()) {
+                        WritableUtils.writeCompressedByteArray(output, e.getKey().get());
+                        WritableUtils.writeCompressedByteArray(output, e.getValue().getFirst());
+                        WritableUtils.writeCompressedByteArray(output, e.getValue().getSecond());
+                    }
+                }
+                break;
+            default:
+                throw new IOException("Unrecognized projection type '" + projector.type + "'");    
+            }
+            scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+    }
+    
+    public static ScanProjector deserializeProjectorFromScan(Scan scan) {
+        byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
+        if (proj == null) {
+            return null;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(proj);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            int t = WritableUtils.readVInt(input);
+            ProjectionType type = ProjectionType.values()[t];
+            if (type == ProjectionType.TABLE) {
+                byte[] tablePrefix = WritableUtils.readCompressedByteArray(input);
+                return new ScanProjector(type, tablePrefix, null, null);
+            }
+            if (type == ProjectionType.CF) {
+                int count = WritableUtils.readVInt(input);
+                Map<ImmutableBytesPtr, byte[]> cfMap = new HashMap<ImmutableBytesPtr, byte[]>();
+                for (int i = 0; i < count; i++) {
+                    byte[] cf = WritableUtils.readCompressedByteArray(input);
+                    byte[] renamed = WritableUtils.readCompressedByteArray(input);
+                    cfMap.put(new ImmutableBytesPtr(cf), renamed);
+                }
+                return new ScanProjector(type, null, cfMap, null);
+            }
+            
+            int count = WritableUtils.readVInt(input);
+            Map<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> cqMap = 
+                new HashMap<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>>();
+            for (int i = 0; i < count; i++) {
+                byte[] cf = WritableUtils.readCompressedByteArray(input);
+                int nQuals = WritableUtils.readVInt(input);
+                Map<ImmutableBytesPtr, Pair<byte[], byte[]>> map = 
+                    new HashMap<ImmutableBytesPtr, Pair<byte[], byte[]>>();
+                for (int j = 0; j < nQuals; j++) {
+                    byte[] cq = WritableUtils.readCompressedByteArray(input);
+                    byte[] renamedCf = WritableUtils.readCompressedByteArray(input);
+                    byte[] renamedCq = WritableUtils.readCompressedByteArray(input);
+                    map.put(new ImmutableBytesPtr(cq), new Pair<byte[], byte[]>(renamedCf, renamedCq));
+                }
+                cqMap.put(new ImmutableBytesPtr(cf), map);
+            }
+            return new ScanProjector(type, null, null, cqMap);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    public ProjectionType getType() {
+        return this.type;
+    }
+    
+    public byte[] getTablePrefix() {
+        return this.tablePrefix;
+    }
+    
+    public Map<ImmutableBytesPtr, byte[]> getCfProjectionMap() {
+        return this.cfProjectionMap;
+    }
+    
+    public Map<ImmutableBytesPtr, Map<ImmutableBytesPtr, Pair<byte[], byte[]>>> getCqProjectionMap() {
+        return this.cqProjectionMap;
+    }
+    
+    public KeyValue getProjectedKeyValue(KeyValue kv) {
+        if (type == ProjectionType.TABLE) {
+            byte[] cf = ByteUtil.concat(tablePrefix, SEPERATOR, kv.getFamily());
+            return KeyValueUtil.newKeyValue(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), 
+                    cf, kv.getQualifier(), kv.getTimestamp(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+        }
+        
+        if (type == ProjectionType.CF) {
+            byte[] cf = cfProjectionMap.get(new ImmutableBytesPtr(kv.getFamily()));
+            if (cf == null)
+                return kv;
+            return KeyValueUtil.newKeyValue(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), 
+                    cf, kv.getQualifier(), kv.getTimestamp(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+        }
+        
+        Map<ImmutableBytesPtr, Pair<byte[], byte[]>> map = cqProjectionMap.get(new ImmutableBytesPtr(kv.getFamily()));
+        if (map == null)
+            return kv;
+        
+        Pair<byte[], byte[]> col = map.get(new ImmutableBytesPtr(kv.getQualifier()));
+        if (col == null)
+            return kv;
+        
+        return KeyValueUtil.newKeyValue(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), 
+                col.getFirst(), col.getSecond(), kv.getTimestamp(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
new file mode 100644
index 0000000..cb76b70
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -0,0 +1,313 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.RegionScannerResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ * 
+ * Wraps the scan performing a non aggregate query to prevent needless retries
+ * if a Phoenix bug is encountered from our custom filter expression evaluation.
+ * Unfortunately, until HBASE-7481 gets fixed, there's no way to do this from our
+ * custom filters.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ScanRegionObserver extends BaseScannerRegionObserver {
+    public static final String NON_AGGREGATE_QUERY = "NonAggregateQuery";
+    private static final String TOPN = "TopN";
+
+    public static void serializeIntoScan(Scan scan, int thresholdBytes, int limit, List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream(); // TODO: size?
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            WritableUtils.writeVInt(output, thresholdBytes);
+            WritableUtils.writeVInt(output, limit);
+            WritableUtils.writeVInt(output, estimatedRowSize);
+            WritableUtils.writeVInt(output, orderByExpressions.size());
+            for (OrderByExpression orderingCol : orderByExpressions) {
+                orderingCol.write(output);
+            }
+            scan.setAttribute(TOPN, stream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
+        byte[] topN = scan.getAttribute(TOPN);
+        if (topN == null) {
+            return null;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size?
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            int thresholdBytes = WritableUtils.readVInt(input);
+            int limit = WritableUtils.readVInt(input);
+            int estimatedRowSize = WritableUtils.readVInt(input);
+            int size = WritableUtils.readVInt(input);
+            List<OrderByExpression> orderByExpressions = Lists.newArrayListWithExpectedSize(size);           
+            for (int i = 0; i < size; i++) {
+                OrderByExpression orderByExpression = new OrderByExpression();
+                orderByExpression.readFields(input);
+                orderByExpressions.add(orderByExpression);
+            }
+            ResultIterator inner = new RegionScannerResultIterator(s);
+            return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, estimatedRowSize);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable {
+        byte[] isScanQuery = scan.getAttribute(NON_AGGREGATE_QUERY);
+
+        if (isScanQuery == null || Bytes.compareTo(PDataType.FALSE_BYTES, isScanQuery) == 0) {
+            return s;
+        }
+        
+        final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+        final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
+        
+        RegionScanner innerScanner = s;
+        if (p != null || j != null) {
+            innerScanner = new HashJoinRegionScanner(s, p, j, tenantId, c.getEnvironment());
+        }
+        
+        final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
+        if (iterator == null) {
+            return getWrappedScanner(c, innerScanner);
+        }
+        
+        return getTopNScanner(c, innerScanner, iterator, tenantId);
+    }
+    
+    /**
+     *  Return region scanner that does TopN.
+     *  We only need to call startRegionOperation and closeRegionOperation when
+     *  getting the first Tuple (which forces running through the entire region)
+     *  since after this everything is held in memory
+     */
+    private RegionScanner getTopNScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final OrderedResultIterator iterator, ImmutableBytesWritable tenantId) throws Throwable {
+        final Tuple firstTuple;
+        TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), tenantId);
+        long estSize = iterator.getEstimatedByteSize();
+        final MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
+        final HRegion region = c.getEnvironment().getRegion();
+        region.startRegionOperation();
+        try {
+            // Once we return from the first call to next, we've run through and cached
+            // the topN rows, so we no longer need to start/stop a region operation.
+            firstTuple = iterator.next();
+            // Now that the topN are cached, we can resize based on the real size
+            long actualSize = iterator.getByteSize();
+            chunk.resize(actualSize);
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+            return null;
+        } finally {
+            region.closeRegionOperation();
+        }
+        return new BaseRegionScanner() {
+            private Tuple tuple = firstTuple;
+            
+            @Override
+            public boolean isFilterDone() {
+                return tuple == null; 
+            }
+
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return s.getRegionInfo();
+            }
+
+            @Override
+            public boolean next(List<KeyValue> results) throws IOException {
+                try {
+                    if (isFilterDone()) {
+                        return false;
+                    }
+                    
+                    for (int i = 0; i < tuple.size(); i++) {
+                        results.add(tuple.getValue(i));
+                    }
+                    
+                    tuple = iterator.next();
+                    return !isFilterDone();
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+                    return false;
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    s.close();
+                } finally {
+                    chunk.close();                }
+            }
+        };
+    }
+        
+    /**
+     * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
+     * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
+     * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
+     * the same from a custom filter.
+     */
+    private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s) {
+        return new RegionScanner() {
+
+            @Override
+            public boolean next(List<KeyValue> results) throws IOException {
+                try {
+                    return s.next(results);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public boolean next(List<KeyValue> results, String metric) throws IOException {
+                try {
+                    return s.next(results, metric);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public boolean next(List<KeyValue> result, int limit) throws IOException {
+                try {
+                    return s.next(result, limit);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public boolean next(List<KeyValue> result, int limit, String metric) throws IOException {
+                try {
+                    return s.next(result, limit, metric);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                s.close();
+            }
+
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return s.getRegionInfo();
+            }
+
+            @Override
+            public boolean isFilterDone() {
+                return s.isFilterDone();
+            }
+
+            @Override
+            public boolean reseek(byte[] row) throws IOException {
+                return s.reseek(row);
+            }
+            
+            @Override
+            public long getMvccReadPoint() {
+                return s.getMvccReadPoint();
+            }
+
+            @Override
+            public boolean nextRaw(List<KeyValue> result, String metric) throws IOException {
+                try {
+                    return s.nextRaw(result, metric);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+
+            @Override
+            public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException {
+                try {
+                    return s.nextRaw(result, limit, metric);
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+                    return false; // impossible
+                }
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
new file mode 100644
index 0000000..ed04cc3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -0,0 +1,258 @@
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+/**
+ * 
+ * Region observer coprocessor for sequence operations:
+ * 1) For creating a sequence, as checkAndPut does not allow us to scope the
+ * Get done for the check with a TimeRange.
+ * 2) For incrementing a sequence, as increment does not a) allow us to set the
+ * timestamp of the key value being incremented and b) recognize when the key
+ * value being incremented does not exist
+ * 3) For deleting a sequence, as checkAndDelete does not allow us to scope
+ * the Get done for the check with a TimeRange.
+ *
+ * @author jtaylor
+ * @since 3.0.0
+ */
+public class SequenceRegionObserver extends BaseRegionObserver {
+    public enum Op {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE};
+    public static final String OPERATION_ATTRIB = "SEQUENCE_OPERATION";
+    public static final String MAX_TIMERANGE_ATTRIB = "MAX_TIMERANGE";
+    public static final String CURRENT_VALUE_ATTRIB = "CURRENT_VALUE";
+    private static final byte[] SUCCESS_VALUE = PDataType.INTEGER.toBytes(Integer.valueOf(Sequence.SUCCESS));
+    
+    private static Result getErrorResult(byte[] row, long timestamp, int errorCode) {
+        byte[] errorCodeBuf = new byte[PDataType.INTEGER.getByteSize()];
+        PDataType.INTEGER.getCodec().encodeInt(errorCode, errorCodeBuf, 0);
+        return new Result(Collections.singletonList(
+                KeyValueUtil.newKeyValue(row, 
+                        PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, 
+                        QueryConstants.EMPTY_COLUMN_BYTES, timestamp, errorCodeBuf)));
+    }
+    /**
+     * 
+     * Use PreIncrement hook of BaseRegionObserver to overcome deficiencies in Increment
+     * implementation (HBASE-10254):
+     * 1) Lack of recognition and identification of when the key value to increment doesn't exist
+     * 2) Lack of the ability to set the timestamp of the updated key value.
+     * Works the same as existing region.increment(), except assumes there is a single column to
+     * increment and uses Phoenix LONG encoding.
+     * @author jtaylor
+     * @since 3.0.0
+     */
+    @Override
+    public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Increment increment) throws IOException {
+        RegionCoprocessorEnvironment env = e.getEnvironment();
+        // We need to set this to prevent region.increment from being called
+        e.bypass();
+        e.complete();
+        HRegion region = env.getRegion();
+        byte[] row = increment.getRow();
+        TimeRange tr = increment.getTimeRange();
+        region.startRegionOperation();
+        try {
+            Integer lid = region.getLock(null, row, true);
+            try {
+                long maxTimestamp = tr.getMax();
+                if (maxTimestamp == HConstants.LATEST_TIMESTAMP) {
+                    maxTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+                    tr = new TimeRange(tr.getMin(), maxTimestamp);
+                }
+                Get get = new Get(row);
+                get.setTimeRange(tr.getMin(), tr.getMax());
+                for (Map.Entry<byte[],NavigableMap<byte[], Long>> entry : increment.getFamilyMap().entrySet()) {
+                    byte[] cf = entry.getKey();
+                    for (byte[] cq : entry.getValue().keySet()) {
+                        get.addColumn(cf, cq);
+                    }
+                }
+                Result result = region.get(get);
+                if (result.isEmpty()) {
+                    return getErrorResult(row, maxTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
+                }
+                KeyValue currentValueKV = Sequence.getCurrentValueKV(result);
+                KeyValue incrementByKV = Sequence.getIncrementByKV(result);
+                KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);
+                long value = PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), currentValueKV.getValueOffset(), null);
+                long incrementBy = PDataType.LONG.getCodec().decodeLong(incrementByKV.getBuffer(), incrementByKV.getValueOffset(), null);
+                int cacheSize = PDataType.INTEGER.getCodec().decodeInt(cacheSizeKV.getBuffer(), cacheSizeKV.getValueOffset(), null);
+                value += incrementBy * cacheSize;
+                byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
+                PDataType.LONG.getCodec().encodeLong(value, valueBuffer, 0);
+                Put put = new Put(row, currentValueKV.getTimestamp());
+                // Hold timestamp constant for sequences, so that clients always only see the latest value
+                // regardless of when they connect.
+                KeyValue newCurrentValueKV = KeyValueUtil.newKeyValue(row, currentValueKV.getFamily(), currentValueKV.getQualifier(), currentValueKV.getTimestamp(), valueBuffer);
+                put.add(newCurrentValueKV);
+                @SuppressWarnings("unchecked")
+                Pair<Mutation,Integer>[] mutations = new Pair[1];
+                mutations[0] = new Pair<Mutation,Integer>(put, lid);
+                region.batchMutate(mutations);
+                return Sequence.replaceCurrentValueKV(result, newCurrentValueKV);
+            } finally {
+                region.releaseRowLock(lid);
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t);
+            return null; // Impossible
+        } finally {
+            region.closeRegionOperation();
+        }
+    }
+
+    /**
+     * Override the preAppend for checkAndPut and checkAndDelete, as we need the ability to
+     * a) set the TimeRange for the Get being done and
+     * b) return something back to the client to indicate success/failure
+     */
+    @SuppressWarnings("deprecation")
+    @Override
+    public Result preAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
+            final Append append) throws IOException {
+        byte[] opBuf = append.getAttribute(OPERATION_ATTRIB);
+        if (opBuf == null) {
+            return null;
+        }
+        Op op = Op.values()[opBuf[0]];
+        KeyValue keyValue = append.getFamilyMap().values().iterator().next().iterator().next();
+
+        long clientTimestamp = HConstants.LATEST_TIMESTAMP;
+        long minGetTimestamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
+        long maxGetTimestamp = HConstants.LATEST_TIMESTAMP;
+        boolean hadClientTimestamp;
+        byte[] clientTimestampBuf = null;
+        if (op == Op.RETURN_SEQUENCE) {
+            // When returning sequences, this allows us to send the expected timestamp
+            // of the sequence to make sure we don't reset any other sequence
+            hadClientTimestamp = true;
+            clientTimestamp = minGetTimestamp = keyValue.getTimestamp();
+            maxGetTimestamp = minGetTimestamp + 1;
+        } else {
+            clientTimestampBuf = append.getAttribute(MAX_TIMERANGE_ATTRIB);
+            if (clientTimestampBuf != null) {
+                clientTimestamp = maxGetTimestamp = Bytes.toLong(clientTimestampBuf);
+            }
+            hadClientTimestamp = (clientTimestamp != HConstants.LATEST_TIMESTAMP);
+            if (hadClientTimestamp) {
+                // Prevent race condition of creating two sequences at the same timestamp
+                // by looking for a sequence at or after the timestamp at which it'll be
+                // created.
+                if (op == Op.CREATE_SEQUENCE) {
+                    maxGetTimestamp = clientTimestamp + 1;
+                }            
+            } else {
+                clientTimestamp = maxGetTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+                clientTimestampBuf = Bytes.toBytes(clientTimestamp);
+            }
+        }
+
+        RegionCoprocessorEnvironment env = e.getEnvironment();
+        // We need to set this to prevent region.append from being called
+        e.bypass();
+        e.complete();
+        HRegion region = env.getRegion();
+        byte[] row = append.getRow();
+        region.startRegionOperation();
+        try {
+            Integer lid = region.getLock(null, row, true);
+            try {
+                byte[] family = keyValue.getFamily();
+                byte[] qualifier = keyValue.getQualifier();
+
+                Get get = new Get(row);
+                get.setTimeRange(minGetTimestamp, maxGetTimestamp);
+                get.addColumn(family, qualifier);
+                Result result = region.get(get);
+                if (result.isEmpty()) {
+                    if (op == Op.DROP_SEQUENCE || op == Op.RETURN_SEQUENCE) {
+                        return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
+                    }
+                } else {
+                    if (op == Op.CREATE_SEQUENCE) {
+                        return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_ALREADY_EXIST.getErrorCode());
+                    }
+                }
+                Mutation m = null;
+                switch (op) {
+                case RETURN_SEQUENCE:
+                    KeyValue currentValueKV = result.raw()[0];
+                    long expectedValue = PDataType.LONG.getCodec().decodeLong(append.getAttribute(CURRENT_VALUE_ATTRIB), 0, null);
+                    long value = PDataType.LONG.getCodec().decodeLong(currentValueKV.getBuffer(), currentValueKV.getValueOffset(), null);
+                    // Timestamp should match exactly, or we may have the wrong sequence
+                    if (expectedValue != value || currentValueKV.getTimestamp() != clientTimestamp) {
+                        return new Result(Collections.singletonList(KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, currentValueKV.getTimestamp(), ByteUtil.EMPTY_BYTE_ARRAY)));
+                    }
+                    m = new Put(row, currentValueKV.getTimestamp());
+                    m.getFamilyMap().putAll(append.getFamilyMap());
+                    break;
+                case DROP_SEQUENCE:
+                    m = new Delete(row, clientTimestamp, null);
+                    break;
+                case CREATE_SEQUENCE:
+                    m = new Put(row, clientTimestamp);
+                    m.getFamilyMap().putAll(append.getFamilyMap());
+                    break;
+                }
+                if (!hadClientTimestamp) {
+                    for (List<KeyValue> kvs : m.getFamilyMap().values()) {
+                        for (KeyValue kv : kvs) {
+                            kv.updateLatestStamp(clientTimestampBuf);
+                        }
+                    }
+                }
+                @SuppressWarnings("unchecked")
+                Pair<Mutation,Integer>[] mutations = new Pair[1];
+                mutations[0] = new Pair<Mutation,Integer>(m, lid);
+                region.batchMutate(mutations);
+                long serverTimestamp = MetaDataUtil.getClientTimeStamp(m);
+                // Return result with single KeyValue. The only piece of information
+                // the client cares about is the timestamp, which is the timestamp of
+                // when the mutation was actually performed (useful in the case of .
+                return new Result(Collections.singletonList(KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
+            } finally {
+                region.releaseRowLock(lid);
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t);
+            return null; // Impossible
+        } finally {
+            region.closeRegionOperation();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
new file mode 100644
index 0000000..b27c14c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+
+
+
+
+/**
+ * 
+ * Server-side implementation of {@link ServerCachingProtocol}
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ServerCachingEndpointImpl extends BaseEndpointCoprocessor implements ServerCachingProtocol {
+
+    @Override
+    public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
+        TenantCache tenantCache = GlobalCache.getTenantCache((RegionCoprocessorEnvironment)this.getEnvironment(), tenantId == null ? null : new ImmutableBytesPtr(tenantId));
+        tenantCache.addServerCache(new ImmutableBytesPtr(cacheId), cachePtr, cacheFactory);
+        return true;
+    }
+
+    @Override
+    public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws SQLException {
+        TenantCache tenantCache = GlobalCache.getTenantCache((RegionCoprocessorEnvironment)this.getEnvironment(), tenantId == null ? null : new ImmutableBytesPtr(tenantId));
+        tenantCache.removeServerCache(new ImmutableBytesPtr(cacheId));
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
new file mode 100644
index 0000000..abda834
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+
+/**
+ * 
+ * EndPoint coprocessor to send a cache to a region server.
+ * Used for:
+ * a) hash joins, to send the smaller side of the join to each region server
+ * b) secondary indexes, to send the necessary meta data to each region server
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface ServerCachingProtocol extends CoprocessorProtocol {
+    public static interface ServerCacheFactory extends Writable {
+        public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException;
+    }
+    /**
+     * Add the cache to the region server cache.  
+     * @param tenantId the tenantId or null if not applicable
+     * @param cacheId unique identifier of the cache
+     * @param cachePtr pointer to the byte array of the cache
+     * @param cacheFactory factory that converts from byte array to object representation on the server side
+     * @return true on success and otherwise throws
+     * @throws SQLException 
+     */
+    public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+    /**
+     * Remove the cache from the region server cache.  Called upon completion of
+     * the operation when cache is no longer needed.
+     * @param tenantId the tenantId or null if not applicable
+     * @param cacheId unique identifier of the cache
+     * @return true on success and otherwise throws
+     * @throws SQLException 
+     */
+    public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws SQLException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
new file mode 100644
index 0000000..c7c4575
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -0,0 +1,423 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.client.KeyValueBuilder;
+import org.apache.phoenix.exception.ValueTypeIncompatibleException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+/**
+ * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY).
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
+    private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
+    // TODO: move all constants into a single class
+    public static final String UNGROUPED_AGG = "UngroupedAgg";
+    public static final String DELETE_AGG = "DeleteAgg";
+    public static final String UPSERT_SELECT_TABLE = "UpsertSelectTable";
+    public static final String UPSERT_SELECT_EXPRS = "UpsertSelectExprs";
+    public static final String DELETE_CQ = "DeleteCQ";
+    public static final String DELETE_CF = "DeleteCF";
+    public static final String EMPTY_CF = "EmptyCF";
+    private KeyValueBuilder kvBuilder;
+    
+    @Override
+    public void start(CoprocessorEnvironment e) throws IOException {
+        super.start(e);
+        this.kvBuilder = KeyValueBuilder.get(e.getHBaseVersion());
+    }
+
+    private static void commitBatch(HRegion region, List<Pair<Mutation,Integer>> mutations, byte[] indexUUID) throws IOException {
+        if (indexUUID != null) {
+            for (Pair<Mutation,Integer> pair : mutations) {
+                pair.getFirst().setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+            }
+        }
+        @SuppressWarnings("unchecked")
+        Pair<Mutation,Integer>[] mutationArray = new Pair[mutations.size()];
+        // TODO: should we use the one that is all or none?
+        region.batchMutate(mutations.toArray(mutationArray));
+    }
+    
+    public static void serializeIntoScan(Scan scan) {
+        scan.setAttribute(UNGROUPED_AGG, QueryConstants.TRUE);
+    }
+
+    @Override
+    protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
+        byte[] isUngroupedAgg = scan.getAttribute(UNGROUPED_AGG);
+        if (isUngroupedAgg == null) {
+            return s;
+        }
+        
+        final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+        final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        RegionScanner theScanner = s;
+        if (p != null || j != null)  {
+            theScanner = new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
+        }
+        final RegionScanner innerScanner = theScanner;
+        
+        byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+        PTable projectedTable = null;
+        List<Expression> selectExpressions = null;
+        byte[] upsertSelectTable = scan.getAttribute(UPSERT_SELECT_TABLE);
+        boolean isUpsert = false;
+        boolean isDelete = false;
+        byte[] deleteCQ = null;
+        byte[] deleteCF = null;
+        byte[][] values = null;
+        byte[] emptyCF = null;
+        ImmutableBytesWritable ptr = null;
+        if (upsertSelectTable != null) {
+            isUpsert = true;
+            projectedTable = deserializeTable(upsertSelectTable);
+            selectExpressions = deserializeExpressions(scan.getAttribute(UPSERT_SELECT_EXPRS));
+            values = new byte[projectedTable.getPKColumns().size()][];
+            ptr = new ImmutableBytesWritable();
+        } else {
+            byte[] isDeleteAgg = scan.getAttribute(DELETE_AGG);
+            isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+            if (!isDelete) {
+                deleteCF = scan.getAttribute(DELETE_CF);
+                deleteCQ = scan.getAttribute(DELETE_CQ);
+            }
+            emptyCF = scan.getAttribute(EMPTY_CF);
+        }
+        
+        int batchSize = 0;
+        long ts = scan.getTimeRange().getMax();
+        HRegion region = c.getEnvironment().getRegion();
+        List<Pair<Mutation,Integer>> mutations = Collections.emptyList();
+        if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null) {
+            // TODO: size better
+            mutations = Lists.newArrayListWithExpectedSize(1024);
+            batchSize = c.getEnvironment().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+        }
+        Aggregators aggregators = ServerAggregators.deserialize(
+                scan.getAttribute(GroupedAggregateRegionObserver.AGGREGATORS), c.getEnvironment().getConfiguration());
+        Aggregator[] rowAggregators = aggregators.getAggregators();
+        boolean hasMore;
+        boolean hasAny = false;
+        MultiKeyValueTuple result = new MultiKeyValueTuple();
+        if (logger.isInfoEnabled()) {
+        	logger.info("Starting ungrouped coprocessor scan " + scan);
+        }
+        long rowCount = 0;
+        MultiVersionConsistencyControl.setThreadReadPoint(innerScanner.getMvccReadPoint());
+        region.startRegionOperation();
+        try {
+            do {
+                List<KeyValue> results = new ArrayList<KeyValue>();
+                // Results are potentially returned even when the return value of s.next is false
+                // since this is an indication of whether or not there are more values after the
+                // ones returned
+                hasMore = innerScanner.nextRaw(results, null);
+                if (!results.isEmpty()) {
+                	rowCount++;
+                    result.setKeyValues(results);
+                    try {
+                        if (isDelete) {
+                            @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
+                            // FIXME: the version of the Delete constructor without the lock args was introduced
+                            // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
+                            // of the client.
+                            Delete delete = new Delete(results.get(0).getRow(),ts,null);
+                            mutations.add(new Pair<Mutation,Integer>(delete,null));
+                        } else if (isUpsert) {
+                            Arrays.fill(values, null);
+                            int i = 0;
+                            List<PColumn> projectedColumns = projectedTable.getColumns();
+                            for (; i < projectedTable.getPKColumns().size(); i++) {
+                                Expression expression = selectExpressions.get(i);
+                                if (expression.evaluate(result, ptr)) {
+                                    values[i] = ptr.copyBytes();
+                                    // If ColumnModifier from expression in SELECT doesn't match the
+                                    // column being projected into then invert the bits.
+                                    if (expression.getColumnModifier() != projectedColumns.get(i).getColumnModifier()) {
+                                        ColumnModifier.SORT_DESC.apply(values[i], 0, values[i], 0, values[i].length);
+                                    }
+                                }
+                            }
+                            projectedTable.newKey(ptr, values);
+                            PRow row = projectedTable.newRow(kvBuilder, ts, ptr);
+                            for (; i < projectedColumns.size(); i++) {
+                                Expression expression = selectExpressions.get(i);
+                                if (expression.evaluate(result, ptr)) {
+                                    PColumn column = projectedColumns.get(i);
+                                    byte[] bytes = ptr.copyBytes();
+                                    Object value = expression.getDataType().toObject(bytes, column.getColumnModifier());
+                                    // If ColumnModifier from expression in SELECT doesn't match the
+                                    // column being projected into then invert the bits.
+                                    if (expression.getColumnModifier() != column.getColumnModifier()) {
+                                        ColumnModifier.SORT_DESC.apply(bytes, 0, bytes, 0, bytes.length);
+                                    }
+                                    // We are guaranteed that the two column will have the same type.
+                                    if (!column.getDataType().isSizeCompatible(column.getDataType(),
+                                            value, bytes,
+                                            expression.getMaxLength(), column.getMaxLength(), 
+                                            expression.getScale(), column.getScale())) {
+                                        throw new ValueTypeIncompatibleException(column.getDataType(),
+                                                column.getMaxLength(), column.getScale());
+                                    }
+                                    bytes = column.getDataType().coerceBytes(bytes, value, expression.getDataType(),
+                                            expression.getMaxLength(), expression.getScale(), column.getMaxLength(), column.getScale());
+                                    row.setValue(column, bytes);
+                                }
+                            }
+                            for (Mutation mutation : row.toRowMutations()) {
+                                mutations.add(new Pair<Mutation,Integer>(mutation,null));
+                            }
+                        } else if (deleteCF != null && deleteCQ != null) {
+                            // No need to search for delete column, since we project only it
+                            // if no empty key value is being set
+                            if (emptyCF == null || result.getValue(deleteCF, deleteCQ) != null) {
+                                Delete delete = new Delete(results.get(0).getRow());
+                                delete.deleteColumns(deleteCF,  deleteCQ, ts);
+                                mutations.add(new Pair<Mutation,Integer>(delete,null));
+                            }
+                        }
+                        if (emptyCF != null) {
+                            /*
+                             * If we've specified an emptyCF, then we need to insert an empty
+                             * key value "retroactively" for any key value that is visible at
+                             * the timestamp that the DDL was issued. Key values that are not
+                             * visible at this timestamp will not ever be projected up to
+                             * scans past this timestamp, so don't need to be considered.
+                             * We insert one empty key value per row per timestamp.
+                             */
+                            Set<Long> timeStamps = Sets.newHashSetWithExpectedSize(results.size());
+                            for (KeyValue kv : results) {
+                                long kvts = kv.getTimestamp();
+                                if (!timeStamps.contains(kvts)) {
+                                    Put put = new Put(kv.getRow());
+                                    put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY);
+                                    mutations.add(new Pair<Mutation,Integer>(put,null));
+                                }
+                            }
+                        }
+                        // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+                        if (!mutations.isEmpty() && batchSize > 0 && mutations.size() % batchSize == 0) {
+                            commitBatch(region,mutations, indexUUID);
+                            mutations.clear();
+                        }
+                    } catch (ConstraintViolationException e) {
+                        // Log and ignore in count
+                        logger.error("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), e);
+                        continue;
+                    }
+                    aggregators.aggregate(rowAggregators, result);
+                    hasAny = true;
+                }
+            } while (hasMore);
+        } finally {
+            innerScanner.close();
+            region.closeRegionOperation();
+        }
+        
+        if (logger.isInfoEnabled()) {
+        	logger.info("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan);
+        }
+
+        if (!mutations.isEmpty()) {
+            commitBatch(region,mutations, indexUUID);
+        }
+
+        final boolean hadAny = hasAny;
+        KeyValue keyValue = null;
+        if (hadAny) {
+            byte[] value = aggregators.toBytes(rowAggregators);
+            keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+        }
+        final KeyValue aggKeyValue = keyValue;
+        
+        RegionScanner scanner = new BaseRegionScanner() {
+            private boolean done = !hadAny;
+
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return innerScanner.getRegionInfo();
+            }
+
+            @Override
+            public boolean isFilterDone() {
+                return done;
+            }
+
+            @Override
+            public void close() throws IOException {
+                innerScanner.close();
+            }
+
+            @Override
+            public boolean next(List<KeyValue> results) throws IOException {
+                if (done) return false;
+                done = true;
+                results.add(aggKeyValue);
+                return false;
+            }
+        };
+        return scanner;
+    }
+    
+    private static PTable deserializeTable(byte[] b) {
+        ByteArrayInputStream stream = new ByteArrayInputStream(b);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            PTable table = new PTableImpl();
+            table.readFields(input);
+            return table;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private static List<Expression> deserializeExpressions(byte[] b) {
+        ByteArrayInputStream stream = new ByteArrayInputStream(b);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            int size = WritableUtils.readVInt(input);
+            List<Expression> selectExpressions = Lists.newArrayListWithExpectedSize(size);
+            for (int i = 0; i < size; i++) {
+                ExpressionType type = ExpressionType.values()[WritableUtils.readVInt(input)];
+                Expression selectExpression = type.newInstance();
+                selectExpression.readFields(input);
+                selectExpressions.add(selectExpression);
+            }
+            return selectExpressions;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static byte[] serialize(PTable projectedTable) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            projectedTable.write(output);
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static byte[] serialize(List<Expression> selectExpressions) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            WritableUtils.writeVInt(output, selectExpressions.size());
+            for (int i = 0; i < selectExpressions.size(); i++) {
+                Expression expression = selectExpressions.get(i);
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+                expression.write(output);
+            }
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
new file mode 100644
index 0000000..d1b0b18
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixIOException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLException;
+
+
+public class PhoenixIOException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = SQLExceptionCode.IO_EXCEPTION;
+
+    public PhoenixIOException(Throwable e) {
+        super(e.getMessage(), code.getSQLState(), code.getErrorCode(), e);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
new file mode 100644
index 0000000..fc96923
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/PhoenixParserException.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLSyntaxErrorException;
+
+import org.antlr.runtime.MismatchedTokenException;
+import org.antlr.runtime.MissingTokenException;
+import org.antlr.runtime.RecognitionException;
+import org.antlr.runtime.Token;
+import org.antlr.runtime.UnwantedTokenException;
+
+
+public class PhoenixParserException extends SQLSyntaxErrorException {
+    private static final long serialVersionUID = 1L;
+
+    public static final PhoenixParserException newException(Throwable cause, String[] tokens) {
+        return new PhoenixParserException(getErrorMessage(cause, tokens), cause);
+    }
+    
+    public PhoenixParserException(String msg, Throwable throwable) {
+        super(new SQLExceptionInfo.Builder(getErrorCode(throwable)).setRootCause(throwable)
+                .setMessage(msg).build().toString(),
+                getErrorCode(throwable).getSQLState(), getErrorCode(throwable).getErrorCode(), throwable);
+    }
+
+    public static String getLine(RecognitionException e) {
+        return Integer.toString(e.token.getLine());
+    }
+
+    public static String getColumn(RecognitionException e) {
+        return Integer.toString(e.token.getCharPositionInLine() + 1);
+    }
+
+    public static String getTokenLocation(RecognitionException e) {
+        return "line " + getLine(e) + ", column " + getColumn(e) + ".";
+    }
+
+    public static String getErrorMessage(Throwable e, String[] tokenNames) {
+        String msg;
+        if (e instanceof MissingTokenException) {
+            MissingTokenException mte = (MissingTokenException)e;
+            String tokenName;
+            if (mte.expecting== Token.EOF) {
+                tokenName = "EOF";
+            } else {
+                tokenName = tokenNames[mte.expecting];
+            }
+            msg = "Missing \""+ tokenName +"\" at "+ getTokenLocation(mte);
+        } else if (e instanceof UnwantedTokenException) {
+            UnwantedTokenException ute = (UnwantedTokenException)e;
+            String tokenName;
+            if (ute.expecting== Token.EOF) {
+                tokenName = "EOF";
+            } else {
+                tokenName = tokenNames[ute.expecting];
+            }
+            msg = "Unexpected input. Expecting \"" + tokenName + "\", got \"" + ute.getUnexpectedToken().getText() 
+                    + "\" at " + getTokenLocation(ute);
+        } else if (e instanceof MismatchedTokenException) {
+            MismatchedTokenException mte = (MismatchedTokenException)e;
+            String tokenName;
+            if (mte.expecting== Token.EOF) {
+                tokenName = "EOF";
+            } else {
+                tokenName = tokenNames[mte.expecting];
+            }
+            msg = "Mismatched input. Expecting \"" + tokenName + "\", got \"" + mte.token.getText()
+                    + "\" at " + getTokenLocation(mte);
+        } else if (e instanceof RecognitionException){
+            RecognitionException re = (RecognitionException) e;
+            msg = "Encountered \"" + re.token.getText() + "\" at " + getTokenLocation(re);
+        } else if (e instanceof UnknownFunctionException) {
+            UnknownFunctionException ufe = (UnknownFunctionException) e;
+            msg = "Unknown function: \"" + ufe.getFuncName() + "\".";
+        } else {
+            msg = e.getMessage();
+        }
+        return msg;
+    }
+
+    public static SQLExceptionCode getErrorCode(Throwable e) {
+        if (e instanceof MissingTokenException) {
+            return SQLExceptionCode.MISSING_TOKEN;
+        } else if (e instanceof UnwantedTokenException) {
+            return SQLExceptionCode.UNWANTED_TOKEN;
+        } else if (e instanceof MismatchedTokenException) {
+            return SQLExceptionCode.MISMATCHED_TOKEN;
+        } else if (e instanceof UnknownFunctionException) {
+            return SQLExceptionCode.UNKNOWN_FUNCTION;
+        } else {
+            return SQLExceptionCode.PARSER_ERROR;
+        }
+    }
+}