You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/05/08 08:23:03 UTC

git commit: PHOENIX-937 Handle puts on local index table (Rajeshbabu)

Repository: incubator-phoenix
Updated Branches:
  refs/heads/local-index 360f5a1cd -> 276de4a19


PHOENIX-937 Handle puts on local index table (Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/276de4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/276de4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/276de4a1

Branch: refs/heads/local-index
Commit: 276de4a195b3bdb851df9cf87d2626a1218c65d0
Parents: 360f5a1
Author: James Taylor <jt...@salesforce.com>
Authored: Wed May 7 23:23:37 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed May 7 23:23:37 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/LocalIndexIT.java     | 80 ++++++++++++++++++--
 .../coprocessor/BaseScannerRegionObserver.java  |  1 +
 .../UngroupedAggregateRegionObserver.java       | 65 +++++++++++++++-
 .../hbase/index/util/IndexManagementUtil.java   | 12 +--
 .../apache/phoenix/index/IndexMaintainer.java   |  4 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 54 ++++++++++++-
 6 files changed, 197 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 59bbc7f..6bc1b90 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -33,6 +33,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -67,8 +71,8 @@ public class LocalIndexIT extends BaseIndexIT {
                 "k2 INTEGER NOT NULL,\n" +
                 "v1 VARCHAR,\n" +
                 "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
-                        + (saltBuckets == null || splits != null ? "" : (",salt_buckets=" + saltBuckets)
-                        + (saltBuckets != null || splits == null ? "" : ",splits=" + splits));
+                        + (saltBuckets != null && splits == null ? (",salt_buckets=" + saltBuckets) : "" 
+                        + (saltBuckets == null && splits != null ? (" split on " + splits) : ""));
         conn.createStatement().execute(ddl);
         conn.close();
     }
@@ -91,13 +95,13 @@ public class LocalIndexIT extends BaseIndexIT {
         Connection conn1 = DriverManager.getConnection(getUrl());
         Connection conn2 = DriverManager.getConnection(getUrl());
         try {
-            conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)"+" splits={1,2,3}");
+            conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)"+" split on (1,2,3)");
             fail("Local index cannot be pre-split");
         } catch (SQLException e) { }
         try {
             conn2.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_FULL_NAME).next();
             conn2.unwrap(PhoenixConnection.class).getMetaDataCache().getTable(new PTableKey(null,INDEX_TABLE_NAME));
-            fail("Local index should be created.");
+            fail("Local index should not be created.");
         } catch (TableNotFoundException e) { }
     }
 
@@ -119,7 +123,7 @@ public class LocalIndexIT extends BaseIndexIT {
 
     @Test
     public void testLocalIndexTableRegionSplitPolicyAndSplitKeys() throws Exception {
-        createBaseTable(DATA_TABLE_NAME, null,"{1,2,3}");
+        createBaseTable(DATA_TABLE_NAME, null,"('e','i','o')");
         Connection conn1 = DriverManager.getConnection(getUrl());
         Connection conn2 = DriverManager.getConnection(getUrl());
         conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
@@ -150,4 +154,70 @@ public class LocalIndexIT extends BaseIndexIT {
                 + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
         assertFalse("View index sequences should be deleted.", rs.next());
     }
+    
+    @Test
+    public void testPutsToLocalIndexTable() throws Exception {
+        createBaseTable(DATA_TABLE_NAME, null, "('e','i','o')");
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
+        conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('b',1,2,'z')");
+        conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('f',1,2,'z')");
+        conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('j',2,4,'a')");
+        conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('q',3,1,'c')");
+        conn1.commit();
+        ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + INDEX_TABLE_NAME);
+        assertTrue(rs.next());
+        assertEquals(4, rs.getInt(1));
+        HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+        HTable indexTable = new HTable(admin.getConfiguration() ,TableName.valueOf(MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)));
+        Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
+        byte[][] startKeys = startEndKeys.getFirst();
+        byte[][] endKeys = startEndKeys.getSecond();
+        for (int i = 0; i < startKeys.length; i++) {
+            Scan s = new Scan();
+            s.setStartRow(startKeys[i]);
+            s.setStopRow(endKeys[i]);
+            ResultScanner scanner = indexTable.getScanner(s);
+            int count = 0;
+            for(Result r:scanner){
+                count++;
+            }
+            scanner.close();
+            assertEquals(1, count);
+        }
+        indexTable.close();
+    }
+    
+    @Test
+    public void testBuildIndexWhenUserTableAlreadyHasData() throws Exception {
+        createBaseTable(DATA_TABLE_NAME, null, "('e','i','o')");
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('b',1,2,'z')");
+        conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('f',1,2,'z')");
+        conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('j',2,4,'a')");
+        conn1.createStatement().execute("UPSERT INTO "+DATA_TABLE_NAME+" values('q',3,1,'c')");
+        conn1.commit();
+        conn1.createStatement().execute("CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_NAME + "(v1)");
+        ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + INDEX_TABLE_NAME);
+        assertTrue(rs.next());
+        assertEquals(4, rs.getInt(1));
+        HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+        HTable indexTable = new HTable(admin.getConfiguration() ,TableName.valueOf(MetaDataUtil.getLocalIndexTableName(DATA_TABLE_NAME)));
+        Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
+        byte[][] startKeys = startEndKeys.getFirst();
+        byte[][] endKeys = startEndKeys.getSecond();
+        for (int i = 0; i < startKeys.length; i++) {
+            Scan s = new Scan();
+            s.setStartRow(startKeys[i]);
+            s.setStopRow(endKeys[i]);
+            ResultScanner scanner = indexTable.getScanner(s);
+            int count = 0;
+            for(Result r:scanner){
+                count++;
+            }
+            scanner.close();
+            assertEquals(1, count);
+        }
+        indexTable.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 6f0124a..9e42f64 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -45,6 +45,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
     public static final String GROUP_BY_LIMIT = "_GroupByLimit";
     public static final String LOCAL_INDEX = "_LocalIndex";
+    public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
 
     /**
      * Used by logger to identify coprocessor

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0139400..799bc96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES_PTR;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 
 import java.io.ByteArrayInputStream;
@@ -31,6 +32,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -56,8 +59,12 @@ import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.Aggregators;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.ScanProjector;
@@ -73,6 +80,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
@@ -130,6 +138,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         if (isUngroupedAgg == null) {
             return s;
         }
+
+        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+        List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+        List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024);
         
         final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
@@ -165,6 +177,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             }
             emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
         }
+        if(localIndexBytes != null) {
+            ptr = new ImmutableBytesWritable();
+        }
         
         int batchSize = 0;
         long ts = scan.getTimeRange().getMax();
@@ -197,7 +212,26 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     rowCount++;
                     result.setKeyValues(results);
                     try {
-                        if (isDelete) {
+                        if (indexMaintainers != null) {
+                            for (IndexMaintainer maintainer : indexMaintainers) {
+                                ImmutableBytesPtr emptyKeyValueFamily = maintainer.getEmptyKeyValueFamily();
+                                Iterator<Cell> iterator = results.iterator();
+                                while (iterator.hasNext()) {
+                                    Cell cell = iterator.next();
+                                    if (Bytes.compareTo(cell.getRowArray(), cell.getFamilyOffset(), cell.getFamilyLength(), emptyKeyValueFamily.get(), emptyKeyValueFamily.getOffset(), emptyKeyValueFamily.getLength()) == 0
+                                            && Bytes.compareTo(cell.getRowArray(), cell.getQualifierOffset(), cell.getQualifierLength(), EMPTY_COLUMN_BYTES_PTR.get(), EMPTY_COLUMN_BYTES_PTR.getOffset(), EMPTY_COLUMN_BYTES_PTR.getLength()) == 0) {
+                                        iterator.remove();
+                                    }
+                                }
+                                if (!results.isEmpty()) {
+                                    result.getKey(ptr);
+                                    ValueGetter valueGetter = IndexManagementUtil.createGetterFromKeyValues(results);
+                                    Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, c.getEnvironment().getRegion().getStartKey());
+                                    indexMutations.add(put);
+                                }
+                            }
+                            result.setKeyValues(results);
+                        } else if (isDelete) {
                             // FIXME: the version of the Delete constructor without the lock args was introduced
                             // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
                             // of the client.
@@ -278,6 +312,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             commitBatch(region, mutations, indexUUID);
                             mutations.clear();
                         }
+                        // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+                        if (!indexMutations.isEmpty() && batchSize > 0 && indexMutations.size() % batchSize == 0) {
+                            HRegion indexRegion = getIndexRegion(c.getEnvironment());
+                            // Get indexRegion corresponding to data region
+                            commitBatch(indexRegion, indexMutations, null);
+                            indexMutations.clear();
+                        }
+
                     } catch (ConstraintViolationException e) {
                         // Log and ignore in count
                         logger.error("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), e);
@@ -300,6 +342,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             commitBatch(region,mutations, indexUUID);
         }
 
+        if (!indexMutations.isEmpty()) {
+            HRegion indexRegion = getIndexRegion(c.getEnvironment());
+            // Get indexRegion corresponding to data region
+            commitBatch(indexRegion, indexMutations, null);
+            indexMutations.clear();
+        }
+
         final boolean hadAny = hasAny;
         KeyValue keyValue = null;
         if (hadAny) {
@@ -341,7 +390,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         };
         return scanner;
     }
-    
+
+    private HRegion getIndexRegion(RegionCoprocessorEnvironment environment) throws IOException {
+        HRegion userRegion = environment.getRegion();
+        TableName indexTableName = TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(userRegion.getTableDesc().getName()));
+        List<HRegion> onlineRegions = environment.getRegionServerServices().getOnlineRegions(indexTableName);
+        for(HRegion indexRegion : onlineRegions) {
+            if (Bytes.compareTo(userRegion.getStartKey(), indexRegion.getStartKey()) == 0) {
+                return indexRegion;
+            }
+        }
+        return null;
+    }
+
     private static PTable deserializeTable(byte[] b) {
         try {
             PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 2199b4f..917b02c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
@@ -100,15 +101,14 @@ public class IndexManagementUtil {
 
     }
 
-    public static ValueGetter createGetterFromKeyValues(Collection<KeyValue> pendingUpdates) {
+    public static ValueGetter createGetterFromKeyValues(Collection<Cell> pendingUpdates) {
         final Map<ReferencingColumn, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
                 .size());
-        for (KeyValue kv : pendingUpdates) {
+        for (Cell kv : pendingUpdates) {
             // create new pointers to each part of the kv
-            ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength());
-            ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getBuffer(), kv.getQualifierOffset(),
-                    kv.getQualifierLength());
-            ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+            ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getRowArray(),kv.getFamilyOffset(),kv.getFamilyLength());
+            ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength());
+            ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
             valueMap.put(new ReferencingColumn(family, qual), value);
         }
         return new ValueGetter() {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 1ca330f..a3a8791 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -511,7 +511,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return allColumns;
     }
     
-    private ImmutableBytesPtr getEmptyKeyValueFamily() {
+    public ImmutableBytesPtr getEmptyKeyValueFamily() {
         // Since the metadata of an index table will never change,
         // we can infer this based on the family of the first covered column
         // If if there are no covered columns, we know it's our default name
@@ -697,7 +697,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     }
 
     private int getIndexPkColumnCount() {
-        return dataRowKeySchema.getFieldCount() + indexedColumns.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0) - (viewIndexId == null ? 0 : 1);
+        return dataRowKeySchema.getFieldCount() + indexedColumns.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0) /*+ (viewIndexId == null ? 0 : 1)*/;
     }
     
     private RowKeyMetaData newRowKeyMetaData() {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/276de4a1/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 034f212..b420c15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -75,6 +75,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -82,6 +83,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ColumnResolver;
@@ -89,14 +92,20 @@ import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.PostDDLCompiler;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AlterIndexStatement;
 import org.apache.phoenix.parse.ColumnDef;
@@ -489,9 +498,43 @@ public class MetaDataClient {
         connection.rollback();
         try {
             connection.setAutoCommit(true);
-            PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
-            MutationPlan plan = compiler.compile(index);
-            MutationState state = connection.getQueryServices().updateData(plan);
+            MutationState state;
+            // For local indexes, we optimize the initial index population by *not* sending Puts over
+            // the wire for the index rows, as we don't need to do that. Instead, we tap into our
+            // region observer to generate the index rows based on the data rows as we scan
+            if (index.getIndexType() == IndexType.LOCAL) {
+                final PhoenixStatement statement = new PhoenixStatement(connection);
+                String query = "SELECT count(*) FROM \"" + dataTableRef.getTable().getName().getString() + "\"";
+                QueryPlan plan = statement.compileQuery(query);
+                // Set attribute on scan that UngroupedAggregateRegionObserver will switch on.
+                // We'll detect that this attribute was set the server-side and write the index
+                // rows per region as a result. The value of the attribute will be our persisted
+                // index maintainers.
+                // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver
+                Scan scan = plan.getContext().getScan();
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                PTable dataTable = dataTableRef.getTable();
+                dataTable.getIndexMaintainers(ptr);
+                scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+                // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+                // However, in this case, we need to project all of the data columns that contribute to the index.
+                IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable);
+                for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+                    scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+                }
+                Cell kv = plan.iterator().next().getValue(0);
+                ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+                // A single Cell will be returned with the count(*) - we decode that here
+                long rowCount = PDataType.LONG.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+                // The contract is to return a MutationState that contains the number of rows modified. In this
+                // case, it's the number of rows in the data table which corresponds to the number of index
+                // rows that were added.
+                state = new MutationState(0, connection, rowCount);
+            } else {
+                PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
+                MutationPlan plan = compiler.compile(index);
+                state = connection.getQueryServices().updateData(plan);
+            }
             AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, 
                     TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
                     dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
@@ -685,7 +728,10 @@ public class MetaDataClient {
         if (connection.getSCN() != null) {
             return buildIndexAtTimeStamp(table, statement.getTable());
         }
-        
+        if (statement.getIndexType() == IndexType.LOCAL) {
+            ColumnResolver resolver = FromCompiler.getResolverForMutation(statement, connection);
+            tableRef = resolver.getTables().get(0);
+        }
         return buildIndex(table, tableRef);
     }