You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/04 23:40:45 UTC

[10/50] [abbrv] phoenix git commit: PHOENIX-514 Support functional indexes (Thomas D'Silva)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index 2ea42ce..c8cf28e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
@@ -56,35 +55,37 @@ public class PostIndexDDLCompiler {
         //   that would allow the user to easily monitor the process of index creation.
         StringBuilder indexColumns = new StringBuilder();
         StringBuilder dataColumns = new StringBuilder();
-        List<PColumn> dataPKColumns = dataTableRef.getTable().getPKColumns();
-        PTable dataTable = dataTableRef.getTable();
-        int nPKColumns = dataPKColumns.size();
-        boolean isSalted = dataTable.getBucketNum() != null;
-        boolean isMultiTenant = connection.getTenantId() != null && dataTable.isMultiTenant();
-        int posOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0);
-        for (int i = posOffset; i < nPKColumns; i++) {
-            PColumn col = dataPKColumns.get(i);
-            if (col.getViewConstant() == null) {
-                String indexColName = IndexUtil.getIndexColumnName(col);
-                dataColumns.append('"').append(col.getName()).append("\",");
-                indexColumns.append('"').append(indexColName).append("\",");
-            }
+        
+        // Add the pk index columns
+        List<PColumn> indexPKColumns = indexTable.getPKColumns();
+        int nIndexPKColumns = indexTable.getPKColumns().size();
+        boolean isSalted = indexTable.getBucketNum() != null;
+        boolean isMultiTenant = connection.getTenantId() != null && indexTable.isMultiTenant();
+        boolean isViewIndex = indexTable.getViewIndexId()!=null;
+        int posOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isViewIndex ? 1 : 0);
+        for (int i = posOffset; i < nIndexPKColumns; i++) {
+            PColumn col = indexPKColumns.get(i);
+            String indexColName = col.getName().getString();
+            dataColumns.append(col.getExpressionStr()).append(",");
+            indexColumns.append('"').append(indexColName).append("\",");
         }
-        for (PColumnFamily family : dataTableRef.getTable().getColumnFamilies()) {
+        
+        // Add the covered columns
+        for (PColumnFamily family : indexTable.getColumnFamilies()) {
             for (PColumn col : family.getColumns()) {
                 if (col.getViewConstant() == null) {
-                    String indexColName = IndexUtil.getIndexColumnName(col);
-                    try {
-                        indexTable.getColumn(indexColName);
-                        dataColumns.append('"').append(col.getFamilyName()).append("\".");
-                        dataColumns.append('"').append(col.getName()).append("\",");
-                        indexColumns.append('"').append(indexColName).append("\",");
-                    } catch (ColumnNotFoundException e) {
-                        // Catch and ignore - means that this data column is not in the index
+                    String indexColName = col.getName().getString();
+                    String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColName);
+                    String dataColumnName = IndexUtil.getDataColumnName(indexColName);
+                    if (!dataFamilyName.equals("")) {
+                        dataColumns.append('"').append(dataFamilyName).append("\".");
                     }
+                    dataColumns.append('"').append(dataColumnName).append("\",");
+                    indexColumns.append('"').append(indexColName).append("\",");
                 }
             }
         }
+
         dataColumns.setLength(dataColumns.length()-1);
         indexColumns.setLength(indexColumns.length()-1);
         String schemaName = dataTableRef.getTable().getSchemaName().getString();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index d534d50..2ac075e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -603,7 +603,7 @@ public class UpsertCompiler {
                         @Override
                         public MutationState execute() throws SQLException {
                             ImmutableBytesWritable ptr = context.getTempPtr();
-                            tableRef.getTable().getIndexMaintainers(ptr);
+                            tableRef.getTable().getIndexMaintainers(ptr, context.getConnection());
                             ServerCache cache = null;
                             try {
                                 if (ptr.getLength() > 0) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 1360178..406b567 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -49,12 +49,12 @@ import org.apache.phoenix.parse.SubqueryParseNode;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
-import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.TypeMismatchException;
+import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -173,7 +173,7 @@ public class WhereCompiler {
                 context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), ref.getColumn().getName()
                         .getBytes());
             }
-            return ref.newColumnExpression();
+            return ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9d055c3..ce81e1f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.coprocessor;
 
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
@@ -60,7 +61,9 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -105,25 +108,23 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.metrics.Metrics;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
-import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
-import org.apache.phoenix.schema.types.PBoolean;
-import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PColumnImpl;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
@@ -132,18 +133,23 @@ import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.types.PVarbinary;
-import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
@@ -242,6 +248,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue ARRAY_SIZE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ARRAY_SIZE_BYTES);
     private static final KeyValue VIEW_CONSTANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_CONSTANT_BYTES);
     private static final KeyValue IS_VIEW_REFERENCED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES);
+    private static final KeyValue COLUMN_DEF_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES);
     private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList(
             DECIMAL_DIGITS_KV,
             COLUMN_SIZE_KV,
@@ -252,7 +259,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             DATA_TABLE_NAME_KV, // included in both column and table row for metadata APIs
             ARRAY_SIZE_KV,
             VIEW_CONSTANT_KV,
-            IS_VIEW_REFERENCED_KV
+            IS_VIEW_REFERENCED_KV,
+            COLUMN_DEF_KV
             );
     static {
         Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -266,7 +274,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int ARRAY_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(ARRAY_SIZE_KV);
     private static final int VIEW_CONSTANT_INDEX = COLUMN_KV_COLUMNS.indexOf(VIEW_CONSTANT_KV);
     private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
-
+    private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
+    
     private static final int LINK_TYPE_INDEX = 0;
 
     private static PName newPName(byte[] keyBuffer, int keyOffset, int keyLength) {
@@ -460,7 +469,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         byte[] viewConstant = viewConstantKv == null ? null : viewConstantKv.getValue();
         Cell isViewReferencedKv = colKeyValues[IS_VIEW_REFERENCED_INDEX];
         boolean isViewReferenced = isViewReferencedKv != null && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isViewReferencedKv.getValueArray(), isViewReferencedKv.getValueOffset(), isViewReferencedKv.getValueLength()));
-        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced);
+        Cell columnDefKv = colKeyValues[COLUMN_DEF_INDEX];
+        String expressionStr = columnDefKv==null ? null : (String)PVarchar.INSTANCE.toObject(columnDefKv.getValueArray(), columnDefKv.getValueOffset(), columnDefKv.getValueLength());
+        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr);
         columns.add(column);
     }
 
@@ -1399,15 +1410,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                     // column, get lock and drop the index. If found as covered
                                     // column, delete from index (do this client side?).
                                     // In either case, invalidate index if the column is in it
+                                    PhoenixConnection connection = QueryUtil.getConnection(env.getConfiguration()).unwrap(PhoenixConnection.class);
                                     for (PTable index : table.getIndexes()) {
                                         try {
-                                            String indexColumnName = IndexUtil.getIndexColumnName(columnToDelete);
-                                            PColumn indexColumn = index.getColumn(indexColumnName);
+                                            IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection);
+                                            // get the columns required to create the index 
+                                            Set<ColumnReference> indexColumns = indexMaintainer.getIndexedColumns();
                                             byte[] indexKey =
                                                     SchemaUtil.getTableKey(tenantId, index
                                                             .getSchemaName().getBytes(), index.getTableName().getBytes());
-                                            // If index contains the column in it's PK, then drop it
-                                            if (SchemaUtil.isPKColumn(indexColumn)) {
+                                            // If index requires this column, then drop it
+                                            if (indexColumns.contains(new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete.getName().getBytes()))) {
                                                 // Since we're dropping the index, lock it to ensure
                                                 // that a change in index state doesn't
                                                 // occur while we're dropping it.
@@ -1439,6 +1452,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                     return new MetaDataMutationResult(
                                             MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager
                                                     .currentTimeMillis(), table, columnToDelete);
+                                } catch (ClassNotFoundException e1) {
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 5bbd5d3..a3b2faa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -66,6 +66,7 @@ import org.apache.phoenix.expression.aggregator.ServerAggregators;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -270,7 +271,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
                             for (IndexMaintainer maintainer : indexMaintainers) {
                                 if (!results.isEmpty()) {
                                     result.getKey(ptr);
-                                    ValueGetter valueGetter = maintainer.createGetterFromKeyValues(results);
+                                    ValueGetter valueGetter = maintainer.createGetterFromKeyValues(ImmutableBytesPtr.copyBytesIfNecessary(ptr),results);
                                     Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, c.getEnvironment().getRegion().getStartKey(), c.getEnvironment().getRegion().getEndKey());
                                     indexMutations.add(put);
                                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 3ff3dd6..7d389ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -234,6 +234,21 @@ public final class PTableProtos {
      * <code>optional bool viewReferenced = 11;</code>
      */
     boolean getViewReferenced();
+
+    // optional string expression = 12;
+    /**
+     * <code>optional string expression = 12;</code>
+     */
+    boolean hasExpression();
+    /**
+     * <code>optional string expression = 12;</code>
+     */
+    java.lang.String getExpression();
+    /**
+     * <code>optional string expression = 12;</code>
+     */
+    com.google.protobuf.ByteString
+        getExpressionBytes();
   }
   /**
    * Protobuf type {@code PColumn}
@@ -341,6 +356,11 @@ public final class PTableProtos {
               viewReferenced_ = input.readBool();
               break;
             }
+            case 98: {
+              bitField0_ |= 0x00000800;
+              expression_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -584,6 +604,49 @@ public final class PTableProtos {
       return viewReferenced_;
     }
 
+    // optional string expression = 12;
+    public static final int EXPRESSION_FIELD_NUMBER = 12;
+    private java.lang.Object expression_;
+    /**
+     * <code>optional string expression = 12;</code>
+     */
+    public boolean hasExpression() {
+      return ((bitField0_ & 0x00000800) == 0x00000800);
+    }
+    /**
+     * <code>optional string expression = 12;</code>
+     */
+    public java.lang.String getExpression() {
+      java.lang.Object ref = expression_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          expression_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string expression = 12;</code>
+     */
+    public com.google.protobuf.ByteString
+        getExpressionBytes() {
+      java.lang.Object ref = expression_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        expression_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       columnNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       familyNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -596,6 +659,7 @@ public final class PTableProtos {
       arraySize_ = 0;
       viewConstant_ = com.google.protobuf.ByteString.EMPTY;
       viewReferenced_ = false;
+      expression_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -662,6 +726,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x00000400) == 0x00000400)) {
         output.writeBool(11, viewReferenced_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        output.writeBytes(12, getExpressionBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -715,6 +782,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(11, viewReferenced_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(12, getExpressionBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -793,6 +864,11 @@ public final class PTableProtos {
         result = result && (getViewReferenced()
             == other.getViewReferenced());
       }
+      result = result && (hasExpression() == other.hasExpression());
+      if (hasExpression()) {
+        result = result && getExpression()
+            .equals(other.getExpression());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -850,6 +926,10 @@ public final class PTableProtos {
         hash = (37 * hash) + VIEWREFERENCED_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getViewReferenced());
       }
+      if (hasExpression()) {
+        hash = (37 * hash) + EXPRESSION_FIELD_NUMBER;
+        hash = (53 * hash) + getExpression().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -981,6 +1061,8 @@ public final class PTableProtos {
         bitField0_ = (bitField0_ & ~0x00000200);
         viewReferenced_ = false;
         bitField0_ = (bitField0_ & ~0x00000400);
+        expression_ = "";
+        bitField0_ = (bitField0_ & ~0x00000800);
         return this;
       }
 
@@ -1053,6 +1135,10 @@ public final class PTableProtos {
           to_bitField0_ |= 0x00000400;
         }
         result.viewReferenced_ = viewReferenced_;
+        if (((from_bitField0_ & 0x00000800) == 0x00000800)) {
+          to_bitField0_ |= 0x00000800;
+        }
+        result.expression_ = expression_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1104,6 +1190,11 @@ public final class PTableProtos {
         if (other.hasViewReferenced()) {
           setViewReferenced(other.getViewReferenced());
         }
+        if (other.hasExpression()) {
+          bitField0_ |= 0x00000800;
+          expression_ = other.expression_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1564,6 +1655,80 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional string expression = 12;
+      private java.lang.Object expression_ = "";
+      /**
+       * <code>optional string expression = 12;</code>
+       */
+      public boolean hasExpression() {
+        return ((bitField0_ & 0x00000800) == 0x00000800);
+      }
+      /**
+       * <code>optional string expression = 12;</code>
+       */
+      public java.lang.String getExpression() {
+        java.lang.Object ref = expression_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          expression_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string expression = 12;</code>
+       */
+      public com.google.protobuf.ByteString
+          getExpressionBytes() {
+        java.lang.Object ref = expression_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          expression_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string expression = 12;</code>
+       */
+      public Builder setExpression(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000800;
+        expression_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string expression = 12;</code>
+       */
+      public Builder clearExpression() {
+        bitField0_ = (bitField0_ & ~0x00000800);
+        expression_ = getDefaultInstance().getExpression();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string expression = 12;</code>
+       */
+      public Builder setExpressionBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000800;
+        expression_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PColumn)
     }
 
@@ -6294,35 +6459,35 @@ public final class PTableProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\014PTable.proto\032\021PGuidePosts.proto\"\347\001\n\007PC" +
+      "\n\014PTable.proto\032\021PGuidePosts.proto\"\373\001\n\007PC" +
       "olumn\022\027\n\017columnNameBytes\030\001 \002(\014\022\027\n\017family" +
       "NameBytes\030\002 \001(\014\022\020\n\010dataType\030\003 \002(\t\022\021\n\tmax" +
       "Length\030\004 \001(\005\022\r\n\005scale\030\005 \001(\005\022\020\n\010nullable\030" +
       "\006 \002(\010\022\020\n\010position\030\007 \002(\005\022\021\n\tsortOrder\030\010 \002" +
       "(\005\022\021\n\tarraySize\030\t \001(\005\022\024\n\014viewConstant\030\n " +
-      "\001(\014\022\026\n\016viewReferenced\030\013 \001(\010\"\232\001\n\013PTableSt" +
-      "ats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030\002 \003(\014\022\033\n\023guid" +
-      "ePostsByteCount\030\003 \001(\003\022\025\n\rkeyBytesCount\030\004" +
-      " \001(\003\022\027\n\017guidePostsCount\030\005 \001(\005\022!\n\013pGuideP",
-      "osts\030\006 \001(\0132\014.PGuidePosts\"\266\004\n\006PTable\022\027\n\017s" +
-      "chemaNameBytes\030\001 \002(\014\022\026\n\016tableNameBytes\030\002" +
-      " \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.PTableType\022\022\n\n" +
-      "indexState\030\004 \001(\t\022\026\n\016sequenceNumber\030\005 \002(\003" +
-      "\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013pkNameBytes\030\007 \001(\014" +
-      "\022\021\n\tbucketNum\030\010 \002(\005\022\031\n\007columns\030\t \003(\0132\010.P" +
-      "Column\022\030\n\007indexes\030\n \003(\0132\007.PTable\022\027\n\017isIm" +
-      "mutableRows\030\013 \002(\010\022 \n\nguidePosts\030\014 \003(\0132\014." +
-      "PTableStats\022\032\n\022dataTableNameBytes\030\r \001(\014\022" +
-      "\031\n\021defaultFamilyName\030\016 \001(\014\022\022\n\ndisableWAL",
-      "\030\017 \002(\010\022\023\n\013multiTenant\030\020 \002(\010\022\020\n\010viewType\030" +
-      "\021 \001(\014\022\025\n\rviewStatement\030\022 \001(\014\022\025\n\rphysical" +
-      "Names\030\023 \003(\014\022\020\n\010tenantId\030\024 \001(\014\022\023\n\013viewInd" +
-      "exId\030\025 \001(\005\022\021\n\tindexType\030\026 \001(\014\022\026\n\016statsTi" +
-      "meStamp\030\027 \001(\003\022\022\n\nstoreNulls\030\030 \001(\010*A\n\nPTa" +
-      "bleType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022" +
-      "\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.phoen" +
-      "ix.coprocessor.generatedB\014PTableProtosH\001" +
-      "\210\001\001\240\001\001"
+      "\001(\014\022\026\n\016viewReferenced\030\013 \001(\010\022\022\n\nexpressio" +
+      "n\030\014 \001(\t\"\232\001\n\013PTableStats\022\013\n\003key\030\001 \002(\014\022\016\n\006" +
+      "values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(" +
+      "\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePostsCo",
+      "unt\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuideP" +
+      "osts\"\266\004\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014" +
+      "\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 " +
+      "\002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016" +
+      "sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022" +
+      "\023\n\013pkNameBytes\030\007 \001(\014\022\021\n\tbucketNum\030\010 \002(\005\022" +
+      "\031\n\007columns\030\t \003(\0132\010.PColumn\022\030\n\007indexes\030\n " +
+      "\003(\0132\007.PTable\022\027\n\017isImmutableRows\030\013 \002(\010\022 \n" +
+      "\nguidePosts\030\014 \003(\0132\014.PTableStats\022\032\n\022dataT" +
+      "ableNameBytes\030\r \001(\014\022\031\n\021defaultFamilyName",
+      "\030\016 \001(\014\022\022\n\ndisableWAL\030\017 \002(\010\022\023\n\013multiTenan" +
+      "t\030\020 \002(\010\022\020\n\010viewType\030\021 \001(\014\022\025\n\rviewStateme" +
+      "nt\030\022 \001(\014\022\025\n\rphysicalNames\030\023 \003(\014\022\020\n\010tenan" +
+      "tId\030\024 \001(\014\022\023\n\013viewIndexId\030\025 \001(\005\022\021\n\tindexT" +
+      "ype\030\026 \001(\014\022\026\n\016statsTimeStamp\030\027 \001(\003\022\022\n\nsto" +
+      "reNulls\030\030 \001(\010*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022" +
+      "\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004" +
+      "B@\n(org.apache.phoenix.coprocessor.gener" +
+      "atedB\014PTableProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6334,7 +6499,7 @@ public final class PTableProtos {
           internal_static_PColumn_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PColumn_descriptor,
-              new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", });
+              new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", });
           internal_static_PTableStats_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_PTableStats_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index e3ee9e8..8a6b8d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -150,6 +150,12 @@ public enum SQLExceptionCode {
      READ_ONLY_CONNECTION(518,"25502","Mutations are not permitted for a read-only connection."),
  
      VARBINARY_ARRAY_NOT_SUPPORTED(519, "42896", "VARBINARY ARRAY is not supported"),
+    
+     /**
+      *  Expression Index exceptions.
+      */
+     AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggreagaate expression are not allowed in an index"),
+     NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", "Non-deterministic expression are not allowed in an index"),
 
      /** 
      * HBase and Phoenix specific implementation defined sub-classes.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index b58de50..94233c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -258,7 +258,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
             }
         }
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        IndexMaintainer.serialize(dataTable, ptr, indexes);
+        IndexMaintainer.serialize(dataTable, ptr, indexes, context.getConnection());
         scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index cfa58fd..04626a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -225,7 +225,7 @@ public class MutationState implements SQLCloseable {
                 try {
                     indexMutations =
                             IndexUtil.generateIndexData(tableRef.getTable(), index, mutationsPertainingToIndex,
-                                tempPtr, connection.getKeyValueBuilder());
+                                tempPtr, connection.getKeyValueBuilder(), connection);
                 } catch (SQLException e) {
                     throw new IllegalDataException(e);
                 }
@@ -368,7 +368,7 @@ public class MutationState implements SQLCloseable {
             Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue();
             TableRef tableRef = entry.getKey();
             PTable table = tableRef.getTable();
-            table.getIndexMaintainers(tempPtr);
+            table.getIndexMaintainers(tempPtr, connection);
             boolean hasIndexMaintainers = tempPtr.getLength() > 0;
             boolean isDataTable = true;
             long serverTimestamp = serverTimeStamps[i++];

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/CoerceExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
index 811ed47..b0396e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/CoerceExpression.java
@@ -85,7 +85,7 @@ public class CoerceExpression extends BaseSingleExpression {
     @Override
     public int hashCode() {
         final int prime = 31;
-        int result = 1;
+        int result = super.hashCode();
         result = prime * result + ((maxLength == null) ? 0 : maxLength.hashCode());
         result = prime * result + ((toSortOrder == null) ? 0 : toSortOrder.hashCode());
         result = prime * result + ((toType == null) ? 0 : toType.hashCode());
@@ -95,14 +95,16 @@ public class CoerceExpression extends BaseSingleExpression {
     @Override
     public boolean equals(Object obj) {
         if (this == obj) return true;
-        if (obj == null) return false;
+        if (!super.equals(obj)) return false;
         if (getClass() != obj.getClass()) return false;
         CoerceExpression other = (CoerceExpression)obj;
         if (maxLength == null) {
             if (other.maxLength != null) return false;
         } else if (!maxLength.equals(other.maxLength)) return false;
         if (toSortOrder != other.toSortOrder) return false;
-        if (toType != other.toType) return false;
+        if (toType == null) {
+            if (other.toType != null) return false;
+        } else if (!toType.equals(other.toType)) return false;
         return true;
     }
 
@@ -122,7 +124,7 @@ public class CoerceExpression extends BaseSingleExpression {
         WritableUtils.writeVInt(output, toSortOrder.getSystemValue());
         WritableUtils.writeVInt(output, maxLength == null ? -1 : maxLength);
     }
-
+    
     @Override
     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
         if (getChild().evaluate(tuple, ptr)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
index 240d013..e4ec438 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
@@ -23,10 +23,10 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ByteUtil;
 
 
@@ -78,6 +78,10 @@ public class RowKeyColumnExpression  extends ColumnExpression {
     public int getPosition() {
         return accessor.getIndex();
     }
+    
+    public String getName() {
+        return name;
+    }
 
     @Override
     public int hashCode() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
index 0e321a7..a6e36cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
@@ -33,4 +33,6 @@ public interface ValueGetter {
    * @throws IOException if there is an error accessing the underlying data storage
    */
   public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException;
+  
+  public byte[] getRowKey();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
index 43c4028..96a7410 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
@@ -84,4 +84,9 @@ public class LazyValueGetter implements ValueGetter {
     }
     return null;
   }
+
+  @Override
+  public byte[] getRowKey() {
+	return this.row; 
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index b4ba12d..dc72059 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -101,24 +101,6 @@ public class IndexManagementUtil {
 
     }
 
-    public static ValueGetter createGetterFromKeyValues(Collection<Cell> pendingUpdates) {
-        final Map<ReferencingColumn, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
-                .size());
-        for (Cell kv : pendingUpdates) {
-            // create new pointers to each part of the kv
-            ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getRowArray(),kv.getFamilyOffset(),kv.getFamilyLength());
-            ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength());
-            ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-            valueMap.put(new ReferencingColumn(family, qual), value);
-        }
-        return new ValueGetter() {
-            @Override
-            public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException {
-                return valueMap.get(ReferencingColumn.wrap(ref));
-            }
-        };
-    }
-
     public static class ReferencingColumn {
         ImmutableBytesPtr family;
         ImmutableBytesPtr qual;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 61b6e68..31f6c76 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -42,15 +43,27 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.IndexExpressionCompiler;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil.ReferencingColumn;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
@@ -59,8 +72,11 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.tuple.ValueGetterTuple;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -88,41 +104,15 @@ import com.google.common.collect.Sets;
  * @since 2.1.0
  */
 public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
-    
-    public static IndexMaintainer create(PTable dataTable, PTable index) {
+
+    private static final int EXPRESSION_NOT_PRESENT = -1;
+    private static final int ESTIMATED_EXPRESSION_SIZE = 8;
+
+	public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) {
         if (dataTable.getType() == PTableType.INDEX || index.getType() != PTableType.INDEX || !dataTable.getIndexes().contains(index)) {
             throw new IllegalArgumentException();
         }
-        IndexMaintainer maintainer = new IndexMaintainer(dataTable, index);
-        int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (maintainer.isMultiTenant ? 1 : 0) + (maintainer.viewIndexId == null ? 0 : 1);
-        RowKeyMetaData rowKeyMetaData = maintainer.getRowKeyMetaData();
-        int indexColByteSize = 0;
-        for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) {
-            PColumn indexColumn = index.getPKColumns().get(i);
-            int indexPos = i - indexPosOffset;
-            PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
-            boolean isPKColumn = SchemaUtil.isPKColumn(column);
-            if (isPKColumn) {
-                int dataPkPos = dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 0 : 1) - (maintainer.isMultiTenant ? 1 : 0);
-                rowKeyMetaData.setIndexPkPosition(dataPkPos, indexPos);
-            } else {
-                indexColByteSize += column.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(column) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
-                maintainer.getIndexedColumnTypes().add(column.getDataType());
-                maintainer.getIndexedColumns().add(new ColumnReference(column.getFamilyName().getBytes(), column.getName().getBytes()));
-            }
-            if (indexColumn.getSortOrder() == SortOrder.DESC) {
-                rowKeyMetaData.getDescIndexColumnBitSet().set(indexPos);
-            }
-        }
-        for (int i = 0; i < index.getColumnFamilies().size(); i++) {
-            PColumnFamily family = index.getColumnFamilies().get(i);
-            for (PColumn indexColumn : family.getColumns()) {
-                PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
-                maintainer.getCoverededColumns().add(new ColumnReference(column.getFamilyName().getBytes(), column.getName().getBytes()));
-            }
-        }
-        maintainer.estimatedIndexRowKeyBytes = maintainer.estimateIndexRowKeyByteSize(indexColByteSize);
-        maintainer.initCachedState();
+        IndexMaintainer maintainer = new IndexMaintainer(dataTable, index, connection);
         return maintainer;
     }
     
@@ -158,9 +148,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      * @param dataTable data table
      * @param ptr bytes pointer to hold returned serialized value
      */
-    public static void serialize(PTable dataTable, ImmutableBytesWritable ptr) {
+    public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, PhoenixConnection connection) {
         List<PTable> indexes = dataTable.getIndexes();
-        serialize(dataTable, ptr, indexes);
+        serialize(dataTable, ptr, indexes, connection);
     }
 
     /**
@@ -170,7 +160,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      * @param indexes indexes to serialize
      */
     public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
-            List<PTable> indexes) {
+            List<PTable> indexes, PhoenixConnection connection) {
         Iterator<PTable> indexesItr = nonDisabledIndexIterator(indexes.iterator());
         if ((dataTable.isImmutableRows()) || !indexesItr.hasNext()) {
             indexesItr = enabledLocalIndexIterator(indexesItr);
@@ -184,7 +174,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         while (indexesItr.hasNext()) {
             nIndexes++;
             PTable index = indexesItr.next();
-            estimatedSize += index.getIndexMaintainer(dataTable).getEstimatedByteSize();
+            estimatedSize += index.getIndexMaintainer(dataTable, connection).getEstimatedByteSize();
         }
         TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedSize + 1);
         DataOutput output = new DataOutputStream(stream);
@@ -197,7 +187,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                     dataTable.isImmutableRows() ? enabledLocalIndexIterator(indexes.iterator())
                             : nonDisabledIndexIterator(indexes.iterator());
             while (indexesItr.hasNext()) {
-                    indexesItr.next().getIndexMaintainer(dataTable).write(output);
+                    indexesItr.next().getIndexMaintainer(dataTable, connection).write(output);
             }
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
@@ -238,9 +228,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
 
     private byte[] viewIndexId;
     private boolean isMultiTenant;
+    // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
+    private List<Expression> indexedExpressions;
+    // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key)
     private Set<ColumnReference> indexedColumns;
     private Set<ColumnReference> coveredColumns;
+    // columns required to create index row i.e. indexedColumns + coveredColumns  (this does not include columns in the data row key)
     private Set<ColumnReference> allColumns;
+    // TODO remove this in the next major release
     private List<PDataType> indexedColumnTypes;
     private RowKeyMetaData rowKeyMetaData;
     private byte[] indexTableName;
@@ -258,6 +253,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     
     private List<ImmutableBytesPtr> indexQualifiers;
     private int estimatedIndexRowKeyBytes;
+    private int estimatedExpressionSize;
     private int[] dataPkPosition;
     private int maxTrailingNulls;
     private ColumnReference dataEmptyKeyValueRef;
@@ -267,32 +263,43 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         this.isDataTableSalted = isDataTableSalted;
     }
 
-    private IndexMaintainer(PTable dataTable, PTable index) {
+    private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
 
-        RowKeySchema dataRowKeySchema = dataTable.getRowKeySchema();
-        boolean isDataTableSalted = dataTable.getBucketNum() != null;
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
         Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum();
         boolean indexWALDisabled = index.isWALDisabled();
         int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1);
+//        int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0;
         int nIndexColumns = index.getColumns().size() - indexPosOffset;
         int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset;
-        int indexedColumnsCount = 0;
-        for (int i  = indexPosOffset; i<index.getPKColumns().size();i++) {
-            PColumn indexColumn = index.getPKColumns().get(i);
-            PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
-            boolean isPKColumn = SchemaUtil.isPKColumn(column);
-            if (!isPKColumn) {
-                indexedColumnsCount++;
-            } 
+        // number of expressions that are indexed that are not present in the row key of the data table
+        int indexedExpressionCount = 0;
+        for (int i = indexPosOffset; i<index.getPKColumns().size();i++) {
+        	PColumn indexColumn = index.getPKColumns().get(i);
+        	if (!IndexUtil.isIndexColumn(indexColumn)) {
+                continue;
+            }
+        	String indexColumnName = indexColumn.getName().getString();
+            String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName);
+            String dataColumnName = IndexUtil.getDataColumnName(indexColumnName);
+            try {
+                PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName);
+                if (SchemaUtil.isPKColumn(dataColumn)) 
+                    continue;
+            } catch (ColumnNotFoundException e) {
+             // This column must be an expression
+            } catch (Exception e) {
+                throw new IllegalArgumentException(e);
+            }
+            indexedExpressionCount++;
         }
-        int indexPkColumnCount = this.dataRowKeySchema.getFieldCount() + indexedColumnsCount - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0);
+        int indexPkColumnCount = this.dataRowKeySchema.getFieldCount() + indexedExpressionCount  - (this.isDataTableSalted ? 1 : 0) - (this.isMultiTenant ? 1 : 0);
         this.rowKeyMetaData = newRowKeyMetaData(indexPkColumnCount);
         BitSet bitSet = this.rowKeyMetaData.getViewConstantColumnBitSet();
 
@@ -312,12 +319,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             }
         }
         this.indexTableName = indexTableName;
-        this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
+        this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
-        this.allColumns = Sets.newLinkedHashSetWithExpectedSize(nDataPKColumns + nIndexColumns);
-        this.allColumns.addAll(indexedColumns);
-        this.allColumns.addAll(coveredColumns);
         this.nIndexSaltBuckets  = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets;
         this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
         this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
@@ -326,6 +330,60 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         // TODO: check whether index is immutable or not. Currently it's always false so checking
         // data table is with immutable rows or not.
         this.immutableRows = dataTable.isImmutableRows();
+        int indexColByteSize = 0;
+        ColumnResolver resolver = null;
+        try {
+            resolver = FromCompiler.getResolver(new TableRef(dataTable));
+        } catch (SQLException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+        StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver);
+        IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context);
+        for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) {
+            PColumn indexColumn = index.getPKColumns().get(i);
+            if (!IndexUtil.isIndexColumn(indexColumn)) {
+                continue;
+            }
+            int indexPos = i - indexPosOffset;
+            Expression expression = null;
+            try {
+                expressionIndexCompiler.reset();
+                ParseNode parseNode  = SQLParser.parseCondition(indexColumn.getExpressionStr());
+                expression = parseNode.accept(expressionIndexCompiler);
+            } catch (SQLException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+            if ( expressionIndexCompiler.getColumnRef()!=null ) {
+            	// get the column of the data table that corresponds to this index column
+	            PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
+	            boolean isPKColumn = SchemaUtil.isPKColumn(column);
+	            if (isPKColumn) {
+	                int dataPkPos = dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 0 : 1) - (this.isMultiTenant ? 1 : 0);
+	                this.rowKeyMetaData.setIndexPkPosition(dataPkPos, indexPos);
+	            } else {
+	                indexColByteSize += column.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(column) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
+	                this.indexedExpressions.add(expression);
+	            }
+            }
+            else {
+            	indexColByteSize += expression.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(expression) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
+                this.indexedExpressions.add(expression);
+            }
+            // set the sort order of the expression correctly
+            if (indexColumn.getSortOrder() == SortOrder.DESC) {
+                this.rowKeyMetaData.getDescIndexColumnBitSet().set(indexPos);
+            }
+        }
+        this.estimatedExpressionSize = expressionIndexCompiler.getTotalNodeCount() * ESTIMATED_EXPRESSION_SIZE;
+        for (int i = 0; i < index.getColumnFamilies().size(); i++) {
+            PColumnFamily family = index.getColumnFamilies().get(i);
+            for (PColumn indexColumn : family.getColumns()) {
+                PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
+                this.coveredColumns.add(new ColumnReference(column.getFamilyName().getBytes(), column.getName().getBytes()));
+            }
+        }
+        this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
+        initCachedState();
     }
 
     public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey)  {
@@ -388,30 +446,26 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 } 
             }
             BitSet descIndexColumnBitSet = rowKeyMetaData.getDescIndexColumnBitSet();
-            int j = 0;
-            Iterator<ColumnReference> iterator = indexedColumns.iterator();
+            Iterator<Expression> expressionIterator = indexedExpressions.iterator();
             for (int i = 0; i < nIndexedColumns; i++) {
                 PDataType dataColumnType;
-                boolean isNullable = true;
-                boolean isDataColumnInverted = false;
-                SortOrder dataSortOrder = SortOrder.getDefault();
-                if (dataPkPosition[i] == -1) {
-                    dataColumnType = indexedColumnTypes.get(j);
-                    ImmutableBytesPtr value = valueGetter.getLatestValue(iterator.next());
-                    if (value == null) {
-                        ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
-                    } else {
-                        ptr.set(value.copyBytesIfNecessary());
-                    }
-                    j++;
-               } else {
+                boolean isNullable;
+                SortOrder dataSortOrder;
+                if (dataPkPosition[i] == EXPRESSION_NOT_PRESENT) {
+                	Expression expression = expressionIterator.next();
+                	dataColumnType = expression.getDataType();
+                	dataSortOrder = expression.getSortOrder();
+                    isNullable = expression.isNullable();
+                	expression.evaluate(new ValueGetterTuple(valueGetter), ptr);
+                }
+                else {
                     Field field = dataRowKeySchema.getField(dataPkPosition[i]);
                     dataColumnType = field.getDataType();
                     ptr.set(rowKeyPtr.get(), dataRowKeyLocator[0][i], dataRowKeyLocator[1][i]);
                     dataSortOrder = field.getSortOrder();
-                    isDataColumnInverted = dataSortOrder != SortOrder.ASC;
                     isNullable = field.isNullable();
                 }
+                boolean isDataColumnInverted = dataSortOrder != SortOrder.ASC;
                 PDataType indexColumnType = IndexUtil.getIndexColumnDataType(isNullable, dataColumnType);
                 boolean isBytesComparable = dataColumnType.isBytesComparableWith(indexColumnType) ;
                 if (isBytesComparable && isDataColumnInverted == descIndexColumnBitSet.get(i)) {
@@ -643,10 +697,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 indexFields[pos] = dataRowKeySchema.getField(i);
             } 
         }
-        int indexedColumnTypesIndex = 0;
+        Iterator<Expression> expressionSetItr = indexedExpressions.iterator();
         for (Field indexField : indexFields) {
             if (indexField == null) { // Add field for kv column in index
-                final PDataType dataType = indexedColumnTypes.get(indexedColumnTypesIndex++);
+                final PDataType dataType = expressionSetItr.next().getDataType();
                 builder.addField(new PDatum() {
 
                     @Override
@@ -823,10 +877,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return coveredColumns;
     }
 
-    public Set<ColumnReference> getIndexedColumns() {
-        return indexedColumns;
-    }
-
     public Set<ColumnReference> getAllColumns() {
         return allColumns;
     }
@@ -838,14 +888,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return emptyKeyValueCFPtr;
     }
 
-    private RowKeyMetaData getRowKeyMetaData() {
-        return rowKeyMetaData;
-    }
-    
-    private List<PDataType> getIndexedColumnTypes() {
-        return indexedColumnTypes;
-    }
-
     @Override
     public void readFields(DataInput input) throws IOException {
         int encodedIndexSaltBucketsAndMultiTenant = WritableUtils.readVInt(input);
@@ -881,7 +923,62 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         indexTableName = Bytes.readByteArray(input);
         dataEmptyKeyValueCF = Bytes.readByteArray(input);
-        emptyKeyValueCFPtr = new ImmutableBytesPtr(Bytes.readByteArray(input));
+        int len = WritableUtils.readVInt(input);
+        //TODO remove this in the next major release
+        boolean isNewClient = false;
+        if (len < 0) {
+          isNewClient = true;
+          len=Math.abs(len);
+        }
+        byte [] emptyKeyValueCF = new byte[len];
+        input.readFully(emptyKeyValueCF, 0, len);
+        emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueCF);
+        
+        if (isNewClient) {
+            int numIndexedExpressions = WritableUtils.readVInt(input);
+            indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions);        
+            for (int i = 0; i < numIndexedExpressions; i++) {
+            	Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+            	expression.readFields(input);
+            	indexedExpressions.add(expression);
+            }
+        }
+        else {
+            indexedExpressions = Lists.newArrayListWithExpectedSize(indexedColumns.size());
+            Iterator<ColumnReference> colReferenceIter = indexedColumns.iterator();
+            Iterator<PDataType> dataTypeIter = indexedColumnTypes.iterator();
+            while (colReferenceIter.hasNext()) {
+                ColumnReference colRef = colReferenceIter.next();
+                final PDataType dataType = dataTypeIter.next();
+                indexedExpressions.add(new KeyValueColumnExpression(new PDatum() {
+                    
+                    @Override
+                    public boolean isNullable() {
+                        return true;
+                    }
+                    
+                    @Override
+                    public SortOrder getSortOrder() {
+                        return SortOrder.getDefault();
+                    }
+                    
+                    @Override
+                    public Integer getScale() {
+                        return null;
+                    }
+                    
+                    @Override
+                    public Integer getMaxLength() {
+                        return null;
+                    }
+                    
+                    @Override
+                    public PDataType getDataType() {
+                        return dataType;
+                    }
+                }, colRef.getFamily(), colRef.getQualifier()));
+            }
+        }
         
         rowKeyMetaData = newRowKeyMetaData();
         rowKeyMetaData.readFields(input);
@@ -908,6 +1005,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             Bytes.writeByteArray(output, ref.getFamily());
             Bytes.writeByteArray(output, ref.getQualifier());
         }
+        //TODO remove indexedColumnTypes in the next major release
         for (int i = 0; i < indexedColumnTypes.size(); i++) {
             PDataType type = indexedColumnTypes.get(i);
             WritableUtils.writeVInt(output, type.ordinal());
@@ -920,9 +1018,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         Bytes.writeByteArray(output, indexTableName);
         Bytes.writeByteArray(output, dataEmptyKeyValueCF);
-        WritableUtils.writeVInt(output,emptyKeyValueCFPtr.getLength());
+        // TODO in order to maintain b/w compatibility encode emptyKeyValueCFPtr.getLength() as a negative value (so we can distinguish between new and old clients)
+        // when indexedColumnTypes is removed, remove this 
+        WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength());
         output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), emptyKeyValueCFPtr.getLength());
         
+        WritableUtils.writeVInt(output, indexedExpressions.size());
+        for (Expression expression : indexedExpressions) {
+        	WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+        	expression.write(output);
+        }
+        
         rowKeyMetaData.write(output);
         // Encode indexWALDisabled in nDataCFs
         WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? -1 : 1));
@@ -941,7 +1047,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             size += WritableUtils.getVIntSize(ref.getQualifier().length);
             size += ref.getQualifier().length;
         }
-        size += indexedColumnTypes.size();
+        for (int i = 0; i < indexedColumnTypes.size(); i++) {
+            PDataType type = indexedColumnTypes.get(i);
+            size += WritableUtils.getVIntSize(type.ordinal());
+        }
         size += WritableUtils.getVIntSize(coveredColumns.size());
         for (ColumnReference ref : coveredColumns) {
             size += WritableUtils.getVIntSize(ref.getFamily().length);
@@ -954,13 +1063,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         size += dataEmptyKeyValueCF.length + WritableUtils.getVIntSize(dataEmptyKeyValueCF.length);
         size += emptyKeyValueCFPtr.getLength() + WritableUtils.getVIntSize(emptyKeyValueCFPtr.getLength());
         size += WritableUtils.getVIntSize(nDataCFs+1);
+        size += WritableUtils.getVIntSize(indexedExpressions.size());
+        for (Expression expression : indexedExpressions) {
+            size += WritableUtils.getVIntSize(ExpressionType.valueOf(expression).ordinal());
+        }
+        size += estimatedExpressionSize;
         return size;
     }
     
     private int estimateIndexRowKeyByteSize(int indexColByteSize) {
         int estimatedIndexRowKeyBytes = indexColByteSize + dataRowKeySchema.getEstimatedValueLength() + (nIndexSaltBuckets == 0 || isLocalIndex || this.isDataTableSalted ? 0 : SaltingUtil.NUM_SALTING_BYTES);
         return estimatedIndexRowKeyBytes;
-   }
+    }
     
     /**
      * Init calculated state reading/creating
@@ -976,20 +1090,33 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 ref.getFamily(), ref.getQualifier())));
         }
 
-        this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedColumns.size() + coveredColumns.size());
+        this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumns.size());
+        // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key)
+        this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
+        for (Expression expression : indexedExpressions) {
+        	KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() {
+                @Override
+                public Void visit(KeyValueColumnExpression expression) {
+                	indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()));
+                    indexedColumnTypes.add(expression.getDataType());
+                    return null;
+                }
+            };
+            expression.accept(visitor);
+        }
         allColumns.addAll(indexedColumns);
         allColumns.addAll(coveredColumns);
         
         int dataPkOffset = (isDataTableSalted ? 1 : 0) + (isMultiTenant ? 1 : 0);
         int nIndexPkColumns = getIndexPkColumnCount();
         dataPkPosition = new int[nIndexPkColumns];
-        Arrays.fill(dataPkPosition, -1);
+        Arrays.fill(dataPkPosition, EXPRESSION_NOT_PRESENT);
         int numViewConstantColumns = 0;
         BitSet viewConstantColumnBitSet = rowKeyMetaData.getViewConstantColumnBitSet();
         for (int i = dataPkOffset; i < dataRowKeySchema.getFieldCount(); i++) {
             if (!viewConstantColumnBitSet.get(i)) {
-                int dataPkPosition = rowKeyMetaData.getIndexPkPosition(i-dataPkOffset);
-                this.dataPkPosition[dataPkPosition] = i;
+                int indexPkPosition = rowKeyMetaData.getIndexPkPosition(i-dataPkOffset);
+                this.dataPkPosition[indexPkPosition] = i;
             } else {
                 numViewConstantColumns++;
             }
@@ -998,15 +1125,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         // Calculate the max number of trailing nulls that we should get rid of after building the index row key.
         // We only get rid of nulls for variable length types, so we have to be careful to consider the type of the
         // index table, not the data type of the data table
-        int indexedColumnTypesPos = indexedColumnTypes.size()-1;
+        int expressionsPos = indexedExpressions.size();
         int indexPkPos = nIndexPkColumns - numViewConstantColumns - 1;
         while (indexPkPos >= 0) {
             int dataPkPos = dataPkPosition[indexPkPos];
             boolean isDataNullable;
             PDataType dataType;
-            if (dataPkPos == -1) {
+            if (dataPkPos == EXPRESSION_NOT_PRESENT) {
                 isDataNullable = true;
-                dataType = indexedColumnTypes.get(indexedColumnTypesPos--);
+                dataType = indexedExpressions.get(--expressionsPos).getDataType();
             } else {
                 Field dataField = dataRowKeySchema.getField(dataPkPos);
                 dataType = dataField.getDataType();
@@ -1022,7 +1149,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     }
 
     private int getIndexPkColumnCount() {
-        return dataRowKeySchema.getFieldCount() + indexedColumns.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0);
+        return dataRowKeySchema.getFieldCount() + indexedExpressions.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0);
     }
     
     private RowKeyMetaData newRowKeyMetaData() {
@@ -1178,7 +1305,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return allColumns.iterator();
     }
 
-    public ValueGetter createGetterFromKeyValues(Collection<? extends Cell> pendingUpdates) {
+    public ValueGetter createGetterFromKeyValues(final byte[] rowKey, Collection<? extends Cell> pendingUpdates) {
         final Map<ReferencingColumn, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
                 .size());
         for (Cell kv : pendingUpdates) {
@@ -1190,10 +1317,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         return new ValueGetter() {
             @Override
-            public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException {
+            public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
                 if(ref.equals(dataEmptyKeyValueRef)) return null;
                 return valueMap.get(ReferencingColumn.wrap(ref));
             }
+            @Override
+            public byte[] getRowKey() {
+            	return rowKey;
+            }
         };
     }
 
@@ -1208,4 +1339,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     public boolean isImmutableRows() {
         return immutableRows;
     }
+    
+    public Set<ColumnReference> getIndexedColumns() {
+        return indexedColumns;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 48a7868..99e26d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -154,7 +154,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
                 } else {
                     indexUpdate.setTable(maintainer.getIndexTableName());
                 }
-                valueGetter = maintainer.createGetterFromKeyValues(state.getPendingUpdate());
+                valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
             } else {
                 // TODO: if more efficient, I could do this just once with all columns in all indexes
                 Pair<Scanner,IndexUpdate> statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 1c98c5c..2fd168a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -19,7 +19,13 @@ package org.apache.phoenix.index;
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,28 +43,27 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
-
-import com.google.common.collect.Multimap;
-
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
+import com.google.common.collect.Multimap;
+
 /**
  * 
  * Handler called in the event that index updates cannot be written to their
@@ -219,7 +224,7 @@ public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy {
                 return Collections.emptySet();
             }
 
-            IndexMaintainer indexMaintainer = localIndex.getIndexMaintainer(dataTable);
+            IndexMaintainer indexMaintainer = localIndex.getIndexMaintainer(dataTable, conn);
             HRegionInfo regionInfo = this.env.getRegion().getRegionInfo();
             int offset =
                     regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 76a1ad1..b26f408 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -166,6 +166,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     public static final String NULLABLE = "NULLABLE";
     public static final byte[] NULLABLE_BYTES = Bytes.toBytes(NULLABLE);
     public static final String COLUMN_DEF = "COLUMN_DEF";
+    public static final byte[] COLUMN_DEF_BYTES = Bytes.toBytes(COLUMN_DEF);
     public static final String SQL_DATA_TYPE = "SQL_DATA_TYPE";
     public static final String SQL_DATETIME_SUB = "SQL_DATETIME_SUB";
     public static final String CHAR_OCTET_LENGTH = "CHAR_OCTET_LENGTH";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c340f5a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 93212bc..4ca5bb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -81,6 +81,7 @@ import org.apache.phoenix.parse.DropTableStatement;
 import org.apache.phoenix.parse.ExplainStatement;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.IndexKeyConstraint;
 import org.apache.phoenix.parse.LimitNode;
 import org.apache.phoenix.parse.NamedNode;
 import org.apache.phoenix.parse.NamedTableNode;
@@ -521,9 +522,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
 
     private static class ExecutableCreateIndexStatement extends CreateIndexStatement implements CompilableStatement {
 
-        public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, PrimaryKeyConstraint pkConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
+        public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
                 ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
-            super(indexName, dataTable, pkConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
+            super(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
         }
 
         @SuppressWarnings("unchecked")
@@ -852,9 +853,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         }
         
         @Override
-        public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, PrimaryKeyConstraint pkConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
+        public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
                 ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
-            return new ExecutableCreateIndexStatement(indexName, dataTable, pkConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
+            return new ExecutableCreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
         }
         
         @Override