You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:35 UTC

[34/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
new file mode 100644
index 0000000..8946c9f
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.*;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.*;
+import org.apache.phoenix.schema.TableRef;
+
+public class DegenerateQueryPlan extends BasicQueryPlan {
+
+    public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) {
+        super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null);
+        context.setScanRanges(ScanRanges.NOTHING);
+    }
+
+    @Override
+    public List<KeyRange> getSplits() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    protected Scanner newScanner(ConnectionQueryServices services) throws SQLException {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/MutationState.java b/src/main/java/org/apache/phoenix/execute/MutationState.java
new file mode 100644
index 0000000..3fa26e4
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -0,0 +1,435 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.IndexMetaDataCacheClient;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.ServerUtil;
+
+/**
+ * 
+ * Tracks the uncommitted state
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class MutationState implements SQLCloseable {
+    private static final Logger logger = LoggerFactory.getLogger(MutationState.class);
+
+    private PhoenixConnection connection;
+    private final long maxSize;
+    private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
+    private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
+    private final long sizeOffset;
+    private int numEntries = 0;
+
+    public MutationState(int maxSize, PhoenixConnection connection) {
+        this(maxSize,connection,0);
+    }
+    
+    public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) {
+        this.maxSize = maxSize;
+        this.connection = connection;
+        this.sizeOffset = sizeOffset;
+    }
+    
+    public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
+        this.maxSize = maxSize;
+        this.connection = connection;
+        this.mutations.put(table, mutations);
+        this.sizeOffset = sizeOffset;
+        this.numEntries = mutations.size();
+        throwIfTooBig();
+    }
+    
+    private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) {
+        this.maxSize = maxSize;
+        this.connection = connection;
+        this.sizeOffset = sizeOffset;
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
+            numEntries += entry.getValue().size();
+            this.mutations.put(entry.getKey(), entry.getValue());
+        }
+        throwIfTooBig();
+    }
+    
+    private void throwIfTooBig() {
+        if (numEntries > maxSize) {
+            // TODO: throw SQLException ?
+            throw new IllegalArgumentException("MutationState size of " + numEntries + " is bigger than max allowed size of " + maxSize);
+        }
+    }
+    
+    public long getUpdateCount() {
+        return sizeOffset + numEntries;
+    }
+    /**
+     * Combine a newer mutation with this one, where in the event of overlaps,
+     * the newer one will take precedence.
+     * @param newMutation the newer mutation
+     */
+    public void join(MutationState newMutation) {
+        if (this == newMutation) { // Doesn't make sense
+            return;
+        }
+        // Merge newMutation with this one, keeping state from newMutation for any overlaps
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) {
+            // Replace existing entries for the table with new entries
+            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(entry.getKey(), entry.getValue());
+            if (existingRows != null) { // Rows for that table already exist
+                // Loop through new rows and replace existing with new
+                for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
+                    // Replace existing row with new row
+                    Map<PColumn,byte[]> existingValues = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+                    if (existingValues != null) {
+                        Map<PColumn,byte[]> newRow = rowEntry.getValue();
+                        // if new row is null, it means delete, and we don't need to merge it with existing row. 
+                        if (newRow != null) {
+                            // Replace existing column values with new column values
+                            for (Map.Entry<PColumn,byte[]> valueEntry : newRow.entrySet()) {
+                                existingValues.put(valueEntry.getKey(), valueEntry.getValue());
+                            }
+                            // Now that the existing row has been merged with the new row, replace it back
+                            // again (since it was replaced with the new one above).
+                            existingRows.put(rowEntry.getKey(), existingValues);
+                        }
+                    } else {
+                        numEntries++;
+                    }
+                }
+                // Put the existing one back now that it's merged
+                this.mutations.put(entry.getKey(), existingRows);
+            } else {
+                numEntries += entry.getValue().size();
+            }
+        }
+        throwIfTooBig();
+    }
+    
+    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) {
+        final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
+        Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
+            ImmutableBytesPtr key = rowEntry.getKey();
+            PRow row = tableRef.getTable().newRow(timestamp, key);
+            if (rowEntry.getValue() == null) { // means delete
+                row.delete();
+            } else {
+                for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) {
+                    row.setValue(valueEntry.getKey(), valueEntry.getValue());
+                }
+            }
+            mutations.addAll(row.toRowMutations());
+        }
+        final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
+                (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? 
+                        IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : 
+                        Iterators.<PTable>emptyIterator();
+        return new Iterator<Pair<byte[],List<Mutation>>>() {
+            boolean isFirst = true;
+
+            @Override
+            public boolean hasNext() {
+                return isFirst || indexes.hasNext();
+            }
+
+            @Override
+            public Pair<byte[], List<Mutation>> next() {
+                if (isFirst) {
+                    isFirst = false;
+                    return new Pair<byte[],List<Mutation>>(tableRef.getTable().getName().getBytes(),mutations);
+                }
+                PTable index = indexes.next();
+                List<Mutation> indexMutations;
+                try {
+                    indexMutations = IndexUtil.generateIndexData(tableRef.getTable(), index, mutations, tempPtr);
+                } catch (SQLException e) {
+                    throw new IllegalDataException(e);
+                }
+                return new Pair<byte[],List<Mutation>>(index.getName().getBytes(),indexMutations);
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+            
+        };
+    }
+    
+    /**
+     * Get the unsorted list of HBase mutations for the tables with uncommitted data.
+     * @return list of HBase mutations for uncommitted data.
+     */
+    public Iterator<Pair<byte[],List<Mutation>>> toMutations() {
+        return toMutations(false);
+    }
+    
+    public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) {
+        final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
+        if (!iterator.hasNext()) {
+            return Iterators.emptyIterator();
+        }
+        Long scn = connection.getSCN();
+        final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+        return new Iterator<Pair<byte[],List<Mutation>>>() {
+            private Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next();
+            private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
+                    
+            private Iterator<Pair<byte[],List<Mutation>>> init() {
+                return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes);
+            }
+            
+            @Override
+            public boolean hasNext() {
+                return innerIterator.hasNext() || iterator.hasNext();
+            }
+
+            @Override
+            public Pair<byte[], List<Mutation>> next() {
+                if (!innerIterator.hasNext()) {
+                    current = iterator.next();
+                }
+                return innerIterator.next();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+            
+        };
+    }
+        
+    /**
+     * Validates that the meta data is still valid based on the current server time
+     * and returns the server time to use for the upsert for each table.
+     * @param connection
+     * @return the server time to use for the upsert
+     * @throws SQLException if the table or any columns no longer exist
+     */
+    private long[] validate() throws SQLException {
+        int i = 0;
+        Long scn = connection.getSCN();
+        MetaDataClient client = new MetaDataClient(connection);
+        long[] timeStamps = new long[this.mutations.size()];
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) {
+            TableRef tableRef = entry.getKey();
+            long serverTimeStamp = tableRef.getTimeStamp();
+            PTable table = tableRef.getTable();
+            if (!connection.getAutoCommit()) {
+                serverTimeStamp = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+                if (serverTimeStamp < 0) {
+                    serverTimeStamp *= -1;
+                    // TODO: use bitset?
+                    PColumn[] columns = new PColumn[table.getColumns().size()];
+                    for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
+                        Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
+                        if (valueEntry != null) {
+                            for (PColumn column : valueEntry.keySet()) {
+                                columns[column.getPosition()] = column;
+                            }
+                        }
+                    }
+                    table = connection.getPMetaData().getTable(tableRef.getTable().getName().getString());
+                    for (PColumn column : columns) {
+                        if (column != null) {
+                            table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
+                        }
+                    }
+                }
+            }
+            timeStamps[i++] = scn == null ? serverTimeStamp : scn;
+        }
+        return timeStamps;
+    }
+    
+    private static void logMutationSize(HTableInterface htable, List<Mutation> mutations) {
+        long byteSize = 0;
+        int keyValueCount = 0;
+        for (Mutation mutation : mutations) {
+            if (mutation.getFamilyMap() != null) { // Not a Delete of the row
+                for (Entry<byte[], List<KeyValue>> entry : mutation.getFamilyMap().entrySet()) {
+                    if (entry.getValue() != null) {
+                        for (KeyValue kv : entry.getValue()) {
+                            byteSize += kv.getBuffer().length;
+                            keyValueCount++;
+                        }
+                    }
+                }
+            }
+        }
+        logger.debug("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes");
+    }
+    
+    public void commit() throws SQLException {
+        int i = 0;
+        byte[] tenantId = connection.getTenantId();
+        long[] serverTimeStamps = validate();
+        Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
+        List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size());
+        while (iterator.hasNext()) {
+            Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next();
+            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue();
+            TableRef tableRef = entry.getKey();
+            PTable table = tableRef.getTable();
+            table.getIndexMaintainers(tempPtr);
+            boolean hasIndexMaintainers = tempPtr.getLength() > 0;
+            boolean isDataTable = true;
+            long serverTimestamp = serverTimeStamps[i++];
+            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
+            while (mutationsIterator.hasNext()) {
+                Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
+                byte[] htableName = pair.getFirst();
+                List<Mutation> mutations = pair.getSecond();
+                
+                int retryCount = 0;
+                boolean shouldRetry = false;
+                do {
+                    ServerCache cache = null;
+                    if (hasIndexMaintainers && isDataTable) {
+                        byte[] attribValue = null;
+                        byte[] uuidValue;
+                        if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) {
+                            IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+                            cache = client.addIndexMetadataCache(mutations, tempPtr);
+                            uuidValue = cache.getId();
+                            // If we haven't retried yet, retry for this case only, as it's possible that
+                            // a split will occur after we send the index metadata cache to all known
+                            // region servers.
+                            shouldRetry = true;
+                        } else {
+                            attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr);
+                            uuidValue = ServerCacheClient.generateId();
+                        }
+                        // Either set the UUID to be able to access the index metadata from the cache
+                        // or set the index metadata directly on the Mutation
+                        for (Mutation mutation : mutations) {
+                            if (tenantId != null) {
+                                mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+                            }
+                            mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                            if (attribValue != null) {
+                                mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                            }
+                        }
+                    }
+                    
+                    SQLException sqlE = null;
+                    HTableInterface hTable = connection.getQueryServices().getTable(htableName);
+                    try {
+                        if (logger.isDebugEnabled()) logMutationSize(hTable, mutations);
+                        long startTime = System.currentTimeMillis();
+                        hTable.batch(mutations);
+                        shouldRetry = false;
+                        if (logger.isDebugEnabled()) logger.debug("Total time for batch call of  " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms");
+                        committedList.add(entry);
+                    } catch (Exception e) {
+                        SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
+                        if (inferredE != null) {
+                            if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+                                // Swallow this exception once, as it's possible that we split after sending the index metadata
+                                // and one of the region servers doesn't have it. This will cause it to have it the next go around.
+                                // If it fails again, we don't retry.
+                                logger.warn("Swallowing exception and retrying after clearing meta cache on connection. " + inferredE);
+                                connection.getQueryServices().clearTableRegionCache(htableName);
+                                continue;
+                            }
+                            e = inferredE;
+                        }
+                        // Throw to client with both what was committed so far and what is left to be committed.
+                        // That way, client can either undo what was done or try again with what was not done.
+                        sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
+                    } finally {
+                        try {
+                            hTable.close();
+                        } catch (IOException e) {
+                            if (sqlE != null) {
+                                sqlE.setNextException(ServerUtil.parseServerException(e));
+                            } else {
+                                sqlE = ServerUtil.parseServerException(e);
+                            }
+                        } finally {
+                            try {
+                                if (cache != null) {
+                                    cache.close();
+                                }
+                            } finally {
+                                if (sqlE != null) {
+                                    throw sqlE;
+                                }
+                            }
+                        }
+                    }
+                } while (shouldRetry && retryCount++ < 1);
+                isDataTable = false;
+            }
+            numEntries -= entry.getValue().size();
+            iterator.remove(); // Remove batches as we process them
+        }
+        assert(numEntries==0);
+        assert(this.mutations.isEmpty());
+    }
+    
+    public void rollback(PhoenixConnection connection) throws SQLException {
+        this.mutations.clear();
+        numEntries = 0;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/src/main/java/org/apache/phoenix/execute/ScanPlan.java
new file mode 100644
index 0000000..2913b80
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LimitingResultIterator;
+import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
+import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.query.WrappedScanner;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.TableRef;
+
+
+
+/**
+ * 
+ * Query plan for a basic table scan
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ScanPlan extends BasicQueryPlan {
+    private List<KeyRange> splits;
+    
+    public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) {
+        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, null, parallelIteratorFactory == null ? new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()) : parallelIteratorFactory);
+        if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
+            int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
+                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+            ScanRegionObserver.serializeIntoScan(context.getScan(), thresholdBytes, limit == null ? -1 : limit, orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize());
+        }
+    }
+    
+    @Override
+    public List<KeyRange> getSplits() {
+        return splits;
+    }
+    
+    @Override
+    protected Scanner newScanner(ConnectionQueryServices services) throws SQLException {
+        // Set any scan attributes before creating the scanner, as it will be too late afterwards
+        context.getScan().setAttribute(ScanRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
+        ResultIterator scanner;
+        TableRef tableRef = this.getTableRef();
+        PTable table = tableRef.getTable();
+        boolean isSalted = table.getBucketNum() != null;
+        /* If no limit or topN, use parallel iterator so that we get results faster. Otherwise, if
+         * limit is provided, run query serially.
+         */
+        boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
+        ParallelIterators iterators = new ParallelIterators(context, tableRef, statement, projection, GroupBy.EMPTY_GROUP_BY, isOrdered ? null : limit, parallelIteratorFactory);
+        splits = iterators.getSplits();
+        if (isOrdered) {
+            scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());
+        } else {
+            if (isSalted && 
+                    (services.getProps().getBoolean(
+                            QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, 
+                            QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE) ||
+                     orderBy == OrderBy.ROW_KEY_ORDER_BY)) { // ORDER BY was optimized out b/c query is in row key order
+                scanner = new MergeSortRowKeyResultIterator(iterators, SaltingUtil.NUM_SALTING_BYTES);
+            } else {
+                scanner = new ConcatResultIterator(iterators);
+            }
+            if (limit != null) {
+                scanner = new LimitingResultIterator(scanner, limit);
+            }
+        }
+
+        return new WrappedScanner(scanner, getProjector());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/AddExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/AddExpression.java b/src/main/java/org/apache/phoenix/expression/AddExpression.java
new file mode 100644
index 0000000..fd906c5
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/AddExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+/**
+ * 
+ * Subtract expression implementation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class AddExpression extends BaseAddSubtractExpression {
+    public AddExpression() {
+    }
+
+    public AddExpression(List<Expression> children) {
+        super(children);
+    }
+
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+        T t = visitor.visitLeave(this, l);
+        if (t == null) {
+            t = visitor.defaultReturn(this, l);
+        }
+        return t;
+    }
+
+    @Override
+    public String getOperatorString() {
+        return " + ";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/AndExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/AndExpression.java b/src/main/java/org/apache/phoenix/expression/AndExpression.java
new file mode 100644
index 0000000..9a8e933
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/AndExpression.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+/**
+ * 
+ * AND expression implementation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class AndExpression extends AndOrExpression {
+    public AndExpression() {
+    }
+
+    public AndExpression(List<Expression> children) {
+        super(children);
+    }
+
+    @Override
+    protected boolean getStopValue() {
+        return Boolean.FALSE;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder("(");
+        for (int i = 0; i < children.size() - 1; i++) {
+            buf.append(children.get(i) + " AND ");
+        }
+        buf.append(children.get(children.size()-1));
+        buf.append(')');
+        return buf.toString();
+    }
+    
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+        T t = visitor.visitLeave(this, l);
+        if (t == null) {
+            t = visitor.defaultReturn(this, l);
+        }
+        return t;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/AndOrExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/AndOrExpression.java b/src/main/java/org/apache/phoenix/expression/AndOrExpression.java
new file mode 100644
index 0000000..aebd63a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/AndOrExpression.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Abstract expression implementation for compound AND and OR expressions
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class AndOrExpression extends BaseCompoundExpression {
+    // Remember evaluation of child expression for partial evaluation
+    private BitSet partialEvalState;
+   
+    public AndOrExpression() {
+    }
+    
+    public AndOrExpression(List<Expression> children) {
+        super(children);
+    }
+    
+    @Override
+    public int hashCode() {
+        return 31 * super.hashCode() + Boolean.valueOf(this.getStopValue()).hashCode();
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.BOOLEAN;
+    }
+
+    @Override
+    public void reset() {
+        if (partialEvalState == null) {
+            partialEvalState = new BitSet(children.size());
+        } else {
+            partialEvalState.clear();
+        }
+        super.reset();
+    }
+    
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        boolean isNull = false;
+        boolean stopValue = getStopValue();
+        for (int i = 0; i < children.size(); i++) {
+            Expression child = children.get(i);
+            // If partial state is available, then use that to know we've already evaluated this
+            // child expression and do not need to do so again.
+            if (partialEvalState == null || !partialEvalState.get(i)) {
+                // Call through to child evaluate method matching parent call to allow child to optimize
+                // evaluate versus getValue code path.
+                if (child.evaluate(tuple, ptr)) {
+                    // Short circuit if we see our stop value
+                    if (Boolean.valueOf(stopValue).equals(PDataType.BOOLEAN.toObject(ptr, child.getDataType()))) {
+                        return true;
+                    } else if (partialEvalState != null) {
+                        partialEvalState.set(i);
+                    }
+                } else {
+                    isNull = true;
+                }
+            }
+        }
+        if (isNull) {
+            return false;
+        }
+        return true;
+    }
+
+    protected abstract boolean getStopValue();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/ArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/ArithmeticExpression.java b/src/main/java/org/apache/phoenix/expression/ArithmeticExpression.java
new file mode 100644
index 0000000..622d709
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/ArithmeticExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.List;
+
+public abstract class ArithmeticExpression extends BaseCompoundExpression {
+
+    public ArithmeticExpression() {
+    }
+
+    public ArithmeticExpression(List<Expression> children) {
+        super(children);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder("(");
+        for (int i = 0; i < children.size() - 1; i++) {
+            buf.append(children.get(i) + getOperatorString());
+        }
+        buf.append(children.get(children.size()-1));
+        buf.append(')');
+        return buf.toString();
+    }
+    
+    abstract protected String getOperatorString();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseAddSubtractExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseAddSubtractExpression.java b/src/main/java/org/apache/phoenix/expression/BaseAddSubtractExpression.java
new file mode 100644
index 0000000..1b9e4e5
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseAddSubtractExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.List;
+
+import org.apache.phoenix.schema.PDataType;
+
+
+abstract public class BaseAddSubtractExpression extends ArithmeticExpression {
+    public BaseAddSubtractExpression() {
+    }
+
+    public BaseAddSubtractExpression(List<Expression> children) {
+        super(children);
+    }
+
+    protected static Integer getPrecision(Integer lp, Integer rp, Integer ls, Integer rs) {
+    	if (ls == null || rs == null) {
+    		return PDataType.MAX_PRECISION;
+    	}
+        int val = getScale(lp, rp, ls, rs) + Math.max(lp - ls, rp - rs) + 1;
+        return Math.min(PDataType.MAX_PRECISION, val);
+    }
+
+    protected static Integer getScale(Integer lp, Integer rp, Integer ls, Integer rs) {
+    	// If we are adding a decimal with scale and precision to a decimal
+    	// with no precision nor scale, the scale system does not apply.
+    	if (ls == null || rs == null) {
+    		return null;
+    	}
+        int val = Math.max(ls, rs);
+        return Math.min(PDataType.MAX_PRECISION, val);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseCompoundExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseCompoundExpression.java b/src/main/java/org/apache/phoenix/expression/BaseCompoundExpression.java
new file mode 100644
index 0000000..814dcaf
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseCompoundExpression.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+public abstract class BaseCompoundExpression extends BaseExpression {
+    protected List<Expression> children;
+    private boolean isNullable;
+   
+    public BaseCompoundExpression() {
+    }
+    
+    public BaseCompoundExpression(List<Expression> children) {
+        this.children = ImmutableList.copyOf(children);
+        for (int i = 0; i < children.size(); i++) {
+            Expression child = children.get(i);
+            if (child.isNullable()) {
+                isNullable = true;
+            }
+        }
+    }
+    
+    @Override
+    public List<Expression> getChildren() {
+        return children;
+    }
+    
+    @Override
+    public boolean isNullable() {
+        return isNullable;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + children.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        BaseCompoundExpression other = (BaseCompoundExpression)obj;
+        if (!children.equals(other.children)) return false;
+        return true;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int len = WritableUtils.readVInt(input);
+        List<Expression>children = new ArrayList<Expression>(len);
+        for (int i = 0; i < len; i++) {
+            Expression child = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+            child.readFields(input);
+            isNullable |= child.isNullable();
+            children.add(child);
+        }
+        this.children = ImmutableList.copyOf(children);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        WritableUtils.writeVInt(output, children.size());
+        for (int i = 0; i < children.size(); i++) {
+            Expression child = children.get(i);
+            WritableUtils.writeVInt(output, ExpressionType.valueOf(child).ordinal());
+            child.write(output);
+        }
+    }
+
+    @Override
+    public void reset() {
+        for (int i = 0; i < children.size(); i++) {
+            children.get(i).reset();
+        }
+    }
+    
+    @Override
+    public <T> T accept(ExpressionVisitor<T> visitor) {
+        List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+        T t = visitor.visitLeave(this, l);
+        if (t == null) {
+            t = visitor.defaultReturn(this, l);
+        }
+        return t;
+    }
+    
+    @Override
+    public String toString() {
+        return this.getClass().getName() + " [children=" + children + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseDecimalAddSubtractExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseDecimalAddSubtractExpression.java b/src/main/java/org/apache/phoenix/expression/BaseDecimalAddSubtractExpression.java
new file mode 100644
index 0000000..4f641fe
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseDecimalAddSubtractExpression.java
@@ -0,0 +1,5 @@
+package org.apache.phoenix.expression;
+
+public class BaseDecimalAddSubtractExpression {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseExpression.java b/src/main/java/org/apache/phoenix/expression/BaseExpression.java
new file mode 100644
index 0000000..3636edb
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseExpression.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.ColumnModifier;
+
+
+
+/**
+ * 
+ * Base class for Expression hierarchy that provides common
+ * default implementations for most methods
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseExpression implements Expression {
+    @Override
+    public boolean isNullable() {
+        return false;
+    }
+
+    @Override
+    public Integer getByteSize() {
+        return getDataType().isFixedWidth() ? getDataType().getByteSize() : null;
+    }
+
+    @Override
+    public Integer getMaxLength() {
+        return null;
+    }
+
+    @Override
+    public Integer getScale() {
+        return null;
+    }
+    
+    @Override
+    public ColumnModifier getColumnModifier() {
+    	    return null;
+    }    
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+    }
+
+    @Override
+    public void reset() {
+    }
+    
+    protected final <T> List<T> acceptChildren(ExpressionVisitor<T> visitor, Iterator<Expression> iterator) {
+        if (iterator == null) {
+            iterator = visitor.defaultIterator(this);
+        }
+        List<T> l = Collections.emptyList();
+        while (iterator.hasNext()) {
+            Expression child = iterator.next();
+            T t = child.accept(visitor);
+            if (t != null) {
+                if (l.isEmpty()) {
+                    l = new ArrayList<T>(getChildren().size());
+                }
+                l.add(t);
+            }
+        }
+        return l;
+    }
+    
+    @Override
+    public boolean isConstant() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseSingleExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseSingleExpression.java b/src/main/java/org/apache/phoenix/expression/BaseSingleExpression.java
new file mode 100644
index 0000000..1758438
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseSingleExpression.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+/**
+ * 
+ * Base class for expressions which have a single child expression
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseSingleExpression extends BaseExpression {
+
+    protected List<Expression> children;
+    
+    public BaseSingleExpression() {
+    }
+
+    public BaseSingleExpression(Expression expression) {
+        this.children = ImmutableList.of(expression);
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+        return children;
+    }
+    
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+        expression.readFields(input);
+        children = ImmutableList.of(expression);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        WritableUtils.writeVInt(output, ExpressionType.valueOf(children.get(0)).ordinal());
+        children.get(0).write(output);
+    }
+
+    @Override
+    public boolean isNullable() {
+        return children.get(0).isNullable();
+    }
+
+    @Override
+    public void reset() {
+        children.get(0).reset();
+    }
+
+    @Override
+    public <T> T accept(ExpressionVisitor<T> visitor) {
+        List<T> l = acceptChildren(visitor, null);
+        if (l.isEmpty()) {
+            return visitor.defaultReturn(this, l);
+        }
+        return l.get(0);
+    }
+    
+    public Expression getChild() {
+        return children.get(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/BaseTerminalExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/BaseTerminalExpression.java b/src/main/java/org/apache/phoenix/expression/BaseTerminalExpression.java
new file mode 100644
index 0000000..aaa4371
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/BaseTerminalExpression.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+
+
+
+/**
+ * 
+ * Grouping class for expression that have no expression children
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseTerminalExpression extends BaseExpression {
+    @Override
+    public List<Expression> getChildren() {
+        return Collections.emptyList();
+    }
+    
+    @Override
+    public <T> T accept(ExpressionVisitor<T> visitor) {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/CaseExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/CaseExpression.java b/src/main/java/org/apache/phoenix/expression/CaseExpression.java
new file mode 100644
index 0000000..f36f6a7
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/CaseExpression.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.*;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * CASE/WHEN expression implementation
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class CaseExpression extends BaseCompoundExpression {
+    private static final int FULLY_EVALUATE = -1;
+    
+    private short evalIndex = FULLY_EVALUATE;
+    private boolean foundIndex;
+    private PDataType returnType;
+   
+    public CaseExpression() {
+    }
+    
+    private static List<Expression> coerceIfNecessary(List<Expression> children) throws SQLException {
+        boolean isChildTypeUnknown = false;
+        PDataType returnType = children.get(0).getDataType();
+        for (int i = 2; i < children.size(); i+=2) {
+            Expression child = children.get(i);
+            PDataType childType = child.getDataType();
+            if (childType == null) {
+                isChildTypeUnknown = true;
+            } else if (returnType == null) {
+                returnType = childType;
+                isChildTypeUnknown = true;
+            } else if (returnType == childType || childType.isCoercibleTo(returnType)) {
+                continue;
+            } else if (returnType.isCoercibleTo(childType)) {
+                returnType = childType;
+            } else {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CONVERT_TYPE)
+                    .setMessage("Case expressions must have common type: " + returnType + " cannot be coerced to " + childType)
+                    .build().buildException();
+            }
+        }
+        // If we found an "unknown" child type and the return type is a number
+        // make the return type be the most general number type of DECIMAL.
+        if (isChildTypeUnknown && returnType.isCoercibleTo(PDataType.DECIMAL)) {
+            returnType = PDataType.DECIMAL;
+        }
+        List<Expression> newChildren = children;
+        for (int i = 0; i < children.size(); i+=2) {
+            Expression child = children.get(i);
+            PDataType childType = child.getDataType();
+            if (childType != returnType) {
+                if (newChildren == children) {
+                    newChildren = new ArrayList<Expression>(children);
+                }
+                newChildren.set(i, CoerceExpression.create(child, returnType));
+            }
+        }
+        return newChildren;
+    }
+    /**
+     * Construct CASE/WHEN expression
+     * @param expressions list of expressions in the form of:
+     *  ((<result expression>, <boolean expression>)+, [<optional else result expression>])
+     * @throws SQLException if return type of case expressions do not match and cannot
+     *  be coerced to a common type
+     */
+    public CaseExpression(List<Expression> expressions) throws SQLException {
+        super(coerceIfNecessary(expressions));
+        returnType = children.get(0).getDataType();
+    }
+    
+    private boolean isPartiallyEvaluating() {
+        return evalIndex != FULLY_EVALUATE;
+    }
+    
+    public boolean hasElse() {
+        return children.size() % 2 != 0;
+    }
+    
+    @Override
+    public boolean isNullable() {
+        // If any expression is nullable or there's no else clause
+        // return true since null may be returned.
+        if (super.isNullable() || !hasElse()) {
+            return true;
+        }
+        return children.get(children.size()-1).isNullable();
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return returnType;
+    }
+
+    @Override
+    public void reset() {
+        foundIndex = false;
+        evalIndex = 0;
+    }
+    
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        this.returnType = PDataType.values()[WritableUtils.readVInt(input)];
+    }
+    
+    @Override
+    public void write(DataOutput output) throws IOException {
+        super.write(output);
+        WritableUtils.writeVInt(output, this.returnType.ordinal());
+    }
+    
+    public int evaluateIndexOf(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (foundIndex) {
+            return evalIndex;
+        }
+        int size = children.size();
+        // If we're doing partial evaluation, start where we left off
+        for (int i = isPartiallyEvaluating() ? evalIndex : 0; i < size; i+=2) {
+            // Short circuit if we see our stop value
+            if (i+1 == size) {
+                return i;
+            }
+            // If we get null, we have to re-evaluate from that point (special case this in filter, like is null)
+            // We may only run this when we're done/have all values
+            boolean evaluated = children.get(i+1).evaluate(tuple, ptr);
+            if (evaluated && Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(ptr))) {
+                if (isPartiallyEvaluating()) {
+                    foundIndex = true;
+                }
+                return i;
+            }
+            if (isPartiallyEvaluating()) {
+                if (evaluated || tuple.isImmutable()) {
+                    evalIndex+=2;
+                } else {
+                    /*
+                     * Return early here if incrementally evaluating and we don't
+                     * have all the key values yet. We can't continue because we'd
+                     * potentially be bypassing cases which we could later evaluate
+                     * once we have more column values.
+                     */
+                    return -1;
+                }
+            }
+        }
+        // No conditions matched, return size to indicate that we were able
+        // to evaluate all cases, but didn't find any matches.
+        return size;
+    }
+    
+    /**
+     * Only expression that currently uses the isPartial flag. The IS NULL
+     * expression will use it too. TODO: We could alternatively have a non interface
+     * method, like setIsPartial in which we set to false prior to calling
+     * evaluate.
+     */
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        int index = evaluateIndexOf(tuple, ptr);
+        if (index < 0) {
+            return false;
+        } else if (index == children.size()) {
+            ptr.set(PDataType.NULL_BYTES);
+            return true;
+        }
+        if (children.get(index).evaluate(tuple, ptr)) {
+            return true;
+        }
+        return false;
+    }
+    
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+        T t = visitor.visitLeave(this, l);
+        if (t == null) {
+            t = visitor.defaultReturn(this, l);
+        }
+        return t;
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder("CASE ");
+        for (int i = 0; i < children.size() - 1; i+=2) {
+            buf.append("WHEN ");
+            buf.append(children.get(i+1));
+            buf.append(" THEN ");
+            buf.append(children.get(i));
+        }
+        if (hasElse()) {
+            buf.append(" ELSE " + children.get(children.size()-1));
+        }
+        buf.append(" END");
+        return buf.toString();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/CeilingDecimalExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/CeilingDecimalExpression.java b/src/main/java/org/apache/phoenix/expression/CeilingDecimalExpression.java
new file mode 100644
index 0000000..54758f2
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/CeilingDecimalExpression.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+public class CeilingDecimalExpression extends BaseSingleExpression {
+    private static final MathContext CEILING_CONTEXT = new MathContext(1, RoundingMode.CEILING);
+    
+    public CeilingDecimalExpression() {
+    }
+    
+    public CeilingDecimalExpression(Expression child)  {
+        super(child);
+    }
+    
+    protected MathContext getMathContext() {
+        return CEILING_CONTEXT;
+    }
+    
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        Expression child =  getChild();
+        if (child.evaluate(tuple, ptr)) {
+            PDataType childType = child.getDataType();
+            childType.coerceBytes(ptr, childType, child.getColumnModifier(), null);
+            BigDecimal value = (BigDecimal) childType.toObject(ptr);
+            value = value.round(getMathContext());
+            byte[] b = childType.toBytes(value, child.getColumnModifier());
+            ptr.set(b);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public ColumnModifier getColumnModifier() {
+            return getChild().getColumnModifier();
+    }    
+
+    @Override
+    public final PDataType getDataType() {
+        return  getChild().getDataType();
+    }
+    
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        return getChild().accept(visitor);
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder("CEIL(");
+        for (int i = 0; i < children.size() - 1; i++) {
+            buf.append(getChild().toString());
+        }
+        buf.append(")");
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/CeilingTimestampExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/CeilingTimestampExpression.java b/src/main/java/org/apache/phoenix/expression/CeilingTimestampExpression.java
new file mode 100644
index 0000000..05ce968
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/CeilingTimestampExpression.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.sql.Timestamp;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+public class CeilingTimestampExpression extends BaseSingleExpression {
+    private static final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
+    
+    public CeilingTimestampExpression() {
+    }
+    
+    public CeilingTimestampExpression(Expression child) {
+        super(child);
+    }
+    
+    protected int getRoundUpAmount() {
+        return 1;
+    }
+    
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        Expression child = children.get(0);
+        if (child.evaluate(tuple, ptr)) {
+            PDataType childType = child.getDataType();
+            tempPtr.set(ptr.get(), ptr.getOffset(), ptr.getLength());
+            childType.coerceBytes(tempPtr, childType, child.getColumnModifier(), null);
+            Timestamp value = (Timestamp) childType.toObject(tempPtr);
+            if (value.getNanos() > 0) {
+                value = new Timestamp(value.getTime()+getRoundUpAmount());
+                byte[] b = childType.toBytes(value, child.getColumnModifier());
+                ptr.set(b);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public final PDataType getDataType() {
+        return children.get(0).getDataType();
+    }
+    
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        return getChild().accept(visitor);
+    }
+    
+    
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder("CEIL(");
+        for (int i = 0; i < children.size() - 1; i++) {
+            buf.append(getChild().toString());
+        }
+        buf.append(")");
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/CoerceExpression.java b/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
new file mode 100644
index 0000000..bbb54a0
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+public class CoerceExpression extends BaseSingleExpression {
+    private PDataType toType;
+    private ColumnModifier toMod;
+    private Integer byteSize;
+    
+    public CoerceExpression() {
+    }
+
+    public static Expression create(Expression expression, PDataType toType) throws SQLException {
+        return toType == expression.getDataType() ? expression : expression instanceof LiteralExpression ? LiteralExpression.newConstant(((LiteralExpression)expression).getValue(), toType) : new CoerceExpression(expression, toType);
+    }
+    
+    //Package protected for tests
+    CoerceExpression(Expression expression, PDataType toType) {
+        this(expression, toType, null, null);
+    }
+    
+    CoerceExpression(Expression expression, PDataType toType, ColumnModifier toMod, Integer byteSize) {
+        super(expression);
+        this.toType = toType;
+        this.toMod = toMod;
+        this.byteSize = byteSize;
+    }
+
+    @Override
+    public Integer getByteSize() {
+        return byteSize;
+    }
+    
+    @Override
+    public Integer getMaxLength() {
+        return byteSize;
+    }
+    
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((byteSize == null) ? 0 : byteSize.hashCode());
+        result = prime * result + ((toMod == null) ? 0 : toMod.hashCode());
+        result = prime * result + ((toType == null) ? 0 : toType.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        CoerceExpression other = (CoerceExpression)obj;
+        if (byteSize == null) {
+            if (other.byteSize != null) return false;
+        } else if (!byteSize.equals(other.byteSize)) return false;
+        if (toMod != other.toMod) return false;
+        if (toType != other.toType) return false;
+        return true;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        toType = PDataType.values()[WritableUtils.readVInt(input)];
+        toMod = ColumnModifier.fromSystemValue(WritableUtils.readVInt(input));
+        int byteSize = WritableUtils.readVInt(input);
+        this.byteSize = byteSize == -1 ? null : byteSize;
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        super.write(output);
+        WritableUtils.writeVInt(output, toType.ordinal());
+        WritableUtils.writeVInt(output, ColumnModifier.toSystemValue(toMod));
+        WritableUtils.writeVInt(output, byteSize == null ? -1 : byteSize);
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (getChild().evaluate(tuple, ptr)) {
+            getDataType().coerceBytes(ptr, getChild().getDataType(), getChild().getColumnModifier(), getColumnModifier());
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return toType;
+    }
+    
+    @Override
+    public ColumnModifier getColumnModifier() {
+            return toMod;
+    }    
+
+    @Override
+    public <T> T accept(ExpressionVisitor<T> visitor) {
+        List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+        T t = visitor.visitLeave(this, l);
+        if (t == null) {
+            t = visitor.defaultReturn(this, l);
+        }
+        return t;
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder("TO_" + toType.toString() + "(");
+        for (int i = 0; i < children.size() - 1; i++) {
+            buf.append(children.get(i) + ", ");
+        }
+        buf.append(children.get(children.size()-1) + ")");
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/expression/ColumnExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/expression/ColumnExpression.java b/src/main/java/org/apache/phoenix/expression/ColumnExpression.java
new file mode 100644
index 0000000..bfb0d70
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/expression/ColumnExpression.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.base.Objects;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+
+/**
+ * 
+ * Common base class for column value accessors
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class ColumnExpression extends BaseTerminalExpression {
+    protected PDataType type;
+    private Integer byteSize;
+    private boolean isNullable;
+    private Integer maxLength;
+    private Integer scale;
+    private ColumnModifier columnModifier;
+
+    public ColumnExpression() {
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (isNullable() ? 1231 : 1237);
+        Integer maxLength = this.getByteSize();
+        result = prime * result + ((maxLength == null) ? 0 : maxLength.hashCode());
+        PDataType type = this.getDataType();
+        result = prime * result + ((type == null) ? 0 : type.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        ColumnExpression other = (ColumnExpression)obj;
+        if (this.isNullable() != other.isNullable()) return false;
+        if (!Objects.equal(this.getByteSize(),other.getByteSize())) return false;
+        if (this.getDataType() != other.getDataType()) return false;
+        return true;
+    }
+
+    public ColumnExpression(PDatum datum) {
+        this.type = datum.getDataType();
+        this.isNullable = datum.isNullable();
+        if (type.isFixedWidth() && type.getByteSize() == null) {
+            this.byteSize = datum.getByteSize();
+        }
+        this.maxLength = datum.getMaxLength();
+        this.scale = datum.getScale();
+        this.columnModifier = datum.getColumnModifier();
+    }
+
+    @Override
+    public boolean isNullable() {
+       return isNullable;
+    }
+    
+    @Override
+    public PDataType getDataType() {
+        return type;
+    }
+    
+    @Override
+    public ColumnModifier getColumnModifier() {
+    	return columnModifier;
+    }
+
+    @Override
+    public Integer getByteSize() {
+        if (byteSize != null) {
+            return byteSize;
+        }
+        return super.getByteSize();
+    }
+
+    @Override
+    public Integer getMaxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public Integer getScale() {
+        return scale;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        // read/write type ordinal, maxLength presence, scale presence and isNullable bit together to save space
+        int typeAndFlag = WritableUtils.readVInt(input);
+        isNullable = (typeAndFlag & 0x01) != 0;
+        if ((typeAndFlag & 0x02) != 0) {
+            scale = WritableUtils.readVInt(input);
+        }
+        if ((typeAndFlag & 0x04) != 0) {
+            maxLength = WritableUtils.readVInt(input);
+        }
+        type = PDataType.values()[typeAndFlag >>> 3];
+        if (type.isFixedWidth() && type.getByteSize() == null) {
+            byteSize = WritableUtils.readVInt(input);
+        }
+        columnModifier = ColumnModifier.fromSystemValue(WritableUtils.readVInt(input));
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        // read/write type ordinal, maxLength presence, scale presence and isNullable bit together to save space
+        int typeAndFlag = (isNullable ? 1 : 0) | ((scale != null ? 1 : 0) << 1) | ((maxLength != null ? 1 : 0) << 2)
+                | (type.ordinal() << 3);
+        WritableUtils.writeVInt(output,typeAndFlag);
+        if (scale != null) {
+            WritableUtils.writeVInt(output, scale);
+        }
+        if (maxLength != null) {
+            WritableUtils.writeVInt(output, maxLength);
+        }
+        if (type.isFixedWidth() && type.getByteSize() == null) {
+            WritableUtils.writeVInt(output, byteSize);
+        }
+        WritableUtils.writeVInt(output, ColumnModifier.toSystemValue(columnModifier));
+    }
+}