You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/02 05:00:00 UTC

[26/50] [abbrv] phoenix git commit: PHOENIX-2446 Immutable index - Index vs base table row count does not match when index is created during data load

PHOENIX-2446 Immutable index - Index vs base table row count does not match when index is created during data load


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a138cfe0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a138cfe0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a138cfe0

Branch: refs/heads/calcite
Commit: a138cfe0f3df47091a0d9fe0285a8e572d76b252
Parents: 8574d43
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Fri Jan 8 15:37:31 2016 -0800
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Thu Jan 21 20:34:18 2016 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/index/ImmutableIndexIT.java |  81 ++++++++---
 .../apache/phoenix/execute/PartialCommitIT.java |   2 +-
 .../cache/aggcache/SpillableGroupByCache.java   |  17 +--
 .../apache/phoenix/compile/FromCompiler.java    |   8 +-
 .../phoenix/compile/ListJarsQueryPlan.java      |   2 +-
 .../compile/PostLocalIndexDDLCompiler.java      | 104 +++++++++++++
 .../apache/phoenix/compile/TraceQueryPlan.java  |   2 +-
 .../apache/phoenix/compile/UnionCompiler.java   |   2 +-
 .../phoenix/coprocessor/BaseRegionScanner.java  |  11 +-
 .../GroupedAggregateRegionObserver.java         |  38 +----
 .../coprocessor/MetaDataEndpointImpl.java       |   7 +-
 .../coprocessor/MetaDataRegionObserver.java     |   8 +-
 .../phoenix/coprocessor/ScanRegionObserver.java |  17 +--
 .../UngroupedAggregateRegionObserver.java       |  24 +--
 .../coprocessor/generated/PTableProtos.java     | 145 +++++++++++++++----
 .../apache/phoenix/execute/MutationState.java   |  57 ++++----
 .../org/apache/phoenix/query/QueryServices.java |   2 +
 .../phoenix/query/QueryServicesOptions.java     |   9 +-
 .../apache/phoenix/schema/DelegateColumn.java   |   5 +
 .../apache/phoenix/schema/MetaDataClient.java   | 133 +++++++----------
 .../java/org/apache/phoenix/schema/PColumn.java |   2 +
 .../org/apache/phoenix/schema/PColumnImpl.java  |  21 ++-
 .../apache/phoenix/schema/PMetaDataImpl.java    |   2 +-
 .../org/apache/phoenix/schema/SaltingUtil.java  |   2 +-
 .../phoenix/execute/CorrelatePlanTest.java      |   2 +-
 .../phoenix/execute/UnnestArrayPlanTest.java    |   4 +-
 .../expression/ColumnExpressionTest.java        |   8 +-
 .../iterate/AggregateResultScannerTest.java     |   4 +
 .../phoenix/query/QueryServicesTestImpl.java    |   4 +-
 phoenix-protocol/src/main/MetaDataService.proto |   2 +-
 phoenix-protocol/src/main/PTable.proto          |   1 +
 31 files changed, 449 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 7171382..c18e4ab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end.index;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -34,6 +35,7 @@ import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -55,6 +58,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,6 +72,7 @@ import com.google.common.collect.Maps;
 public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
 
     private final boolean localIndex;
+    private final boolean transactional;
     private final String tableDDLOptions;
     private final String tableName;
     private final String indexName;
@@ -80,6 +85,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
 
     public ImmutableIndexIT(boolean localIndex, boolean transactional) {
         this.localIndex = localIndex;
+        this.transactional = transactional;
         StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
         if (transactional) {
             optionBuilder.append(", TRANSACTIONAL=true");
@@ -98,16 +104,55 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
         serverProps.put("hbase.coprocessor.region.classes", CreateIndexRegionObserver.class.getName());
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
         clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+        clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000");
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
     @Parameters(name="localIndex = {0} , transactional = {1}")
     public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {     
-            { false, true }, { true, true }
-        });
+		return Arrays.asList(new Boolean[][] { 
+				{ false, false }, { false, true },
+				{ true, false }, { true, true } });
     }
 
+    @Test
+    @Ignore
+    public void testDropIfImmutableKeyValueColumn() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String ddl =
+                    "CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+            Statement stmt = conn.createStatement();
+            stmt.execute(ddl);
+            populateTestTable(fullTableName);
+            ddl =
+                    "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON "
+                            + fullTableName + " (long_col1)";
+            stmt.execute(ddl);
+
+            ResultSet rs;
+
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+
+            conn.setAutoCommit(true);
+            String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
+            try {
+                conn.createStatement().execute(dml);
+                fail();
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(),
+                    e.getErrorCode());
+            }
+
+            conn.createStatement().execute("DROP TABLE " + fullTableName);
+        }
+    }
 
     @Test
     public void testCreateIndexDuringUpsertSelect() throws Exception {
@@ -119,8 +164,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
                 + " (long_pk, varchar_pk)"
                 + " INCLUDE (long_col1, long_col2)";
 
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        try {
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
             Statement stmt = conn.createStatement();
             stmt.execute(ddl);
@@ -133,7 +177,6 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
             String upsertSelect = "UPSERT INTO " + TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) " + 
                     "SELECT varchar_pk||'_upsert_select', char_pk, int_pk, long_pk, decimal_pk, date_pk FROM "+ TABLE_NAME;    
             conn.createStatement().execute(upsertSelect);
-
             ResultSet rs;
             rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
             assertTrue(rs.next());
@@ -142,9 +185,6 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
             assertTrue(rs.next());
             assertEquals(440,rs.getInt(1));
         }
-        finally {
-            conn.close();
-        }
     }
 
     // used to create an index while a batch of rows are being written
@@ -156,7 +196,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
             String tableName = c.getEnvironment().getRegion().getRegionInfo()
                     .getTable().getNameAsString();
             if (tableName.equalsIgnoreCase(TABLE_NAME)
-                    // create the index after the second batch of 1000 rows
+                    // create the index after the second batch  
                     && Bytes.startsWith(put.getRow(), Bytes.toBytes("varchar200_upsert_select"))) {
                 try {
                     Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -171,13 +211,14 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
     }
 
     private static class UpsertRunnable implements Runnable {
-        private static final int NUM_ROWS_IN_BATCH = 10000;
+        private static final int NUM_ROWS_IN_BATCH = 1000;
         private final String fullTableName;
 
         public UpsertRunnable(String fullTableName) {
             this.fullTableName = fullTableName;
         }
 
+        @Override
         public void run() {
             Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
             try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
@@ -190,12 +231,9 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
                         fistRowInBatch = false;
                     }
                     conn.commit();
-                    Thread.sleep(500);
                 }
             } catch (SQLException e) {
                 throw new RuntimeException(e);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
             }
         }
     }
@@ -213,10 +251,17 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
             Statement stmt = conn.createStatement();
             stmt.execute(ddl);
 
-            ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+            ExecutorService executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() {
+                @Override
+                public Thread newThread(Runnable r) {
+                    Thread t = Executors.defaultThreadFactory().newThread(r);
+                    t.setDaemon(true);
+                    return t;
+                }
+            });
             List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(numThreads);
             for (int i =0; i<numThreads; ++i) {
-                futureList.add(threadPool.submit(new UpsertRunnable(fullTableName)));
+                futureList.add(executorService.submit(new UpsertRunnable(fullTableName)));
             }
             // upsert some rows before creating the index 
             Thread.sleep(500);
@@ -235,8 +280,8 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
             for (Future<?> future : futureList) {
                 future.cancel(true);
             }
-            threadPool.shutdownNow();
-            threadPool.awaitTermination(30, TimeUnit.SECONDS);
+            executorService.shutdownNow();
+            executorService.awaitTermination(30, TimeUnit.SECONDS);
             Thread.sleep(100);
 
             ResultSet rs;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 8d7ebcb..0fb1869 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -265,7 +265,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         Connection con = driver.connect(url, new Properties());
         PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
         final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
-        // passing a null mutation staate forces the connection.newMutationState() to be used to create the MutationState
+        // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
         return new PhoenixConnection(phxCon, null) {
             @Override
             protected MutationState newMutationState(int maxSize) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
index 69fc6f6..8edeb3a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -340,12 +340,7 @@ public class SpillableGroupByCache implements GroupByCache {
         final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator();
 
         // scanner using the spillable implementation
-        return new BaseRegionScanner() {
-            @Override
-            public HRegionInfo getRegionInfo() {
-                return s.getRegionInfo();
-            }
-
+        return new BaseRegionScanner(s) {
             @Override
             public void close() throws IOException {
                 try {
@@ -374,16 +369,6 @@ public class SpillableGroupByCache implements GroupByCache {
                         SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
                 return cacheIter.hasNext();
             }
-
-            @Override
-            public long getMaxResultSize() {
-              return s.getMaxResultSize();
-            }
-
-            @Override
-            public int getBatch() {
-                return s.getBatch();
-            }
         };
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 9b2c460..dd93c81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -221,7 +221,7 @@ public class FromCompiler {
             Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression();
             PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(),
                     sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
-                    column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp());
+                    column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic());
             projectedColumns.add(projectedColumn);
         }
         PTable t = PTableImpl.makePTable(table, projectedColumns);
@@ -406,7 +406,7 @@ public class FromCompiler {
             String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
             PName tenantId = connection.getTenantId();
             PTable theTable = null;
-            if (updateCacheImmediately || connection.getAutoCommit()) {
+            if (updateCacheImmediately) {
                 MetaDataMutationResult result = client.updateCache(schemaName, tableName);
                 timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
                 theTable = result.getTable();
@@ -547,7 +547,7 @@ public class FromCompiler {
                         familyName = PNameFactory.newName(family);
                     }
                     allcolumns.add(new PColumnImpl(name, familyName, dynColumn.getDataType(), dynColumn.getMaxLength(),
-                            dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false));
+                            dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false, true));
                     position++;
                 }
                 theTable = PTableImpl.makePTable(theTable, allcolumns);
@@ -645,7 +645,7 @@ public class FromCompiler {
                 }
                 PColumnImpl column = new PColumnImpl(PNameFactory.newName(alias),
                         PNameFactory.newName(QueryConstants.DEFAULT_COLUMN_FAMILY),
-                        null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false);
+                        null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false);
                 columns.add(column);
             }
             PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index dac691f..f2b4856 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -80,7 +80,7 @@ public class ListJarsQueryPlan implements QueryPlan {
         PColumn column =
                 new PColumnImpl(PNameFactory.newName("jar_location"), null,
                         PVarchar.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
-                        false, null, false);
+                        false, null, false, false);
         List<PColumn> columns = new ArrayList<PColumn>();
         columns.add(column);
         Expression expression =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
new file mode 100644
index 0000000..f92738c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ScanUtil;
+
+import com.google.common.collect.Lists;
+
+/**
+ * For local indexes, we optimize the initial index population by *not* sending
+ * Puts over the wire for the index rows, as we don't need to do that. Instead,
+ * we tap into our region observer to generate the index rows based on the data
+ * rows as we scan
+ */
+public class PostLocalIndexDDLCompiler {
+	private final PhoenixConnection connection;
+    private final String tableName;
+    
+    public PostLocalIndexDDLCompiler(PhoenixConnection connection, String tableName) {
+        this.connection = connection;
+        this.tableName = tableName;
+    }
+
+	public MutationPlan compile(final PTable index) throws SQLException {
+		try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
+            String query = "SELECT count(*) FROM " + tableName;
+            final QueryPlan plan = statement.compileQuery(query);
+            TableRef tableRef = plan.getTableRef();
+            Scan scan = plan.getContext().getScan();
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            final PTable dataTable = tableRef.getTable();
+            List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
+            // Only build newly created index.
+            indexes.add(index);
+            IndexMaintainer.serialize(dataTable, ptr, indexes, plan.getContext().getConnection());
+            // Set attribute on scan that UngroupedAggregateRegionObserver will switch on.
+            // We'll detect that this attribute was set the server-side and write the index
+            // rows per region as a result. The value of the attribute will be our persisted
+            // index maintainers.
+            // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver
+            scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+            // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+            // However, in this case, we need to project all of the data columns that contribute to the index.
+            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
+            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+                scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+            }
+
+            // Go through MutationPlan abstraction so that we can create local indexes
+            // with a connectionless connection (which makes testing easier).
+            return new BaseMutationPlan(plan.getContext(), Operation.UPSERT) {
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    connection.getMutationState().commitDDLFence(dataTable);
+                    Cell kv = plan.iterator().next().getValue(0);
+                    ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+                    // A single Cell will be returned with the count(*) - we decode that here
+                    long rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+                    // The contract is to return a MutationState that contains the number of rows modified. In this
+                    // case, it's the number of rows in the data table which corresponds to the number of index
+                    // rows that were added.
+                    return new MutationState(0, connection, rowCount);
+                }
+
+            };
+        }
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index a9754b3..1e8210a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -77,7 +77,7 @@ public class TraceQueryPlan implements QueryPlan {
         PColumn column =
                 new PColumnImpl(PNameFactory.newName(MetricInfo.TRACE.columnName), null,
                         PLong.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
-                        false, null, false);
+                        false, null, false, false);
         List<PColumn> columns = new ArrayList<PColumn>();
         columns.add(column);
         Expression expression =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index 3bc1e37..f8b2778 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -75,7 +75,7 @@ public class UnionCompiler {
             String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias();
             PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME,
                     sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
-                    i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false);
+                    i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false, false);
             projectedColumns.add(projectedColumn);
         }
         Long scn = statement.getConnection().getSCN();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index 3f73048..b5e9c9f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -25,8 +25,12 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 
-public abstract class BaseRegionScanner implements RegionScanner {
+public abstract class BaseRegionScanner extends DelegateRegionScanner {
 
+	public BaseRegionScanner(RegionScanner delegate) {
+		super(delegate);
+	}
+	
     @Override
     public boolean isFilterDone() {
         return false;
@@ -46,11 +50,6 @@ public abstract class BaseRegionScanner implements RegionScanner {
     }
 
     @Override
-    public long getMvccReadPoint() {
-        return Long.MAX_VALUE;
-    }
-
-    @Override
     public boolean nextRaw(List<Cell> result) throws IOException {
         return next(result);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index d613688..3237882 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -315,15 +315,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                 aggResults.add(keyValue);
             }
             // scanner using the non spillable, memory-only implementation
-            return new BaseRegionScanner() {
+            return new BaseRegionScanner(s) {
                 private int index = 0;
 
                 @Override
-                public HRegionInfo getRegionInfo() {
-                    return s.getRegionInfo();
-                }
-
-                @Override
                 public void close() throws IOException {
                     try {
                         s.close();
@@ -341,16 +336,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                     index++;
                     return index < aggResults.size();
                 }
-
-                @Override
-                public long getMaxResultSize() {
-                	return s.getMaxResultSize();
-                }
-
-                @Override
-                public int getBatch() {
-                    return s.getBatch();
-                }
             };
         }
 
@@ -471,21 +456,11 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by "
                     + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
         }
-        return new BaseRegionScanner() {
+        return new BaseRegionScanner(scanner) {
             private long rowCount = 0;
             private ImmutableBytesWritable currentKey = null;
 
             @Override
-            public HRegionInfo getRegionInfo() {
-                return scanner.getRegionInfo();
-            }
-
-            @Override
-            public void close() throws IOException {
-                scanner.close();
-            }
-
-            @Override
             public boolean next(List<Cell> results) throws IOException {
                 boolean hasMore;
                 boolean atLimit;
@@ -567,15 +542,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                 currentKey = null;
                 return false;
             }
-
-            @Override
-            public long getMaxResultSize() {
-                return scanner.getMaxResultSize();
-            }
-            @Override
-            public int getBatch() {
-                return scanner.getBatch();
-            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9887e7b..78f9700 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -109,9 +110,12 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.GlobalCache;
@@ -188,6 +192,7 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.UpgradeUtil;
+import org.hamcrest.core.IsInstanceOf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -620,7 +625,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
                         isRowTimestampKV.getValueLength()));
 
-        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp);
+        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false);
         columns.add(column);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 7950ac8..a2f7282 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -22,6 +22,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -252,7 +253,10 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     }
 
                     if (conn == null) {
-                        conn = DriverManager.getConnection(getJdbcUrl(env)).unwrap(PhoenixConnection.class);
+                    	final Properties props = new Properties();
+                    	// don't run a second index populations upsert select 
+                        props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); 
+                        conn = DriverManager.getConnection(getJdbcUrl(env), props).unwrap(PhoenixConnection.class);
                     }
 
                     String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
@@ -270,7 +274,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                         QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
                         QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
                     long timeStamp = Math.max(0, disabledTimeStampVal - overlapTime);
-
+                    
                     LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp);
                     client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp));
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 65a43de..d2bd3b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -253,7 +253,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         } finally {
             region.closeRegionOperation();
         }
-        return new BaseRegionScanner() {
+        return new BaseRegionScanner(s) {
             private Tuple tuple = firstTuple;
 
             @Override
@@ -262,11 +262,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             }
 
             @Override
-            public HRegionInfo getRegionInfo() {
-                return s.getRegionInfo();
-            }
-
-            @Override
             public boolean next(List<Cell> results) throws IOException {
                 try {
                     if (isFilterDone()) {
@@ -301,16 +296,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
                     }
                 }
             }
-
-            @Override
-            public long getMaxResultSize() {
-                return s.getMaxResultSize();
-            }
-
-            @Override
-            public int getBatch() {
-              return s.getBatch();
-            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index f332e60..05cf08e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -554,25 +554,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
         final KeyValue aggKeyValue = keyValue;
 
-        RegionScanner scanner = new BaseRegionScanner() {
+        RegionScanner scanner = new BaseRegionScanner(innerScanner) {
             private boolean done = !hadAny;
 
             @Override
-            public HRegionInfo getRegionInfo() {
-                return innerScanner.getRegionInfo();
-            }
-
-            @Override
             public boolean isFilterDone() {
                 return done;
             }
 
             @Override
-            public void close() throws IOException {
-                innerScanner.close();
-            }
-
-            @Override
             public boolean next(List<Cell> results) throws IOException {
                 if (done) return false;
                 done = true;
@@ -584,11 +574,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             public long getMaxResultSize() {
                 return scan.getMaxResultSize();
             }
-
-            @Override
-            public int getBatch() {
-                return innerScanner.getBatch();
-            }
         };
         return scanner;
     }
@@ -690,7 +675,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         final KeyValue aggKeyValue =
                 KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
                     SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
-        RegionScanner scanner = new BaseRegionScanner() {
+        RegionScanner scanner = new BaseRegionScanner(innerScanner) {
             @Override
             public HRegionInfo getRegionInfo() {
                 return region.getRegionInfo();
@@ -716,11 +701,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             public long getMaxResultSize() {
                 return scan.getMaxResultSize();
             }
-
-            @Override
-            public int getBatch() {
-                return innerScanner.getBatch();
-            }
         };
         return scanner;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index be8d7e2..f74ed0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -259,6 +259,16 @@ public final class PTableProtos {
      * <code>optional bool isRowTimestamp = 13;</code>
      */
     boolean getIsRowTimestamp();
+
+    // optional bool isDynamic = 14;
+    /**
+     * <code>optional bool isDynamic = 14;</code>
+     */
+    boolean hasIsDynamic();
+    /**
+     * <code>optional bool isDynamic = 14;</code>
+     */
+    boolean getIsDynamic();
   }
   /**
    * Protobuf type {@code PColumn}
@@ -376,6 +386,11 @@ public final class PTableProtos {
               isRowTimestamp_ = input.readBool();
               break;
             }
+            case 112: {
+              bitField0_ |= 0x00002000;
+              isDynamic_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -678,6 +693,22 @@ public final class PTableProtos {
       return isRowTimestamp_;
     }
 
+    // optional bool isDynamic = 14;
+    public static final int ISDYNAMIC_FIELD_NUMBER = 14;
+    private boolean isDynamic_;
+    /**
+     * <code>optional bool isDynamic = 14;</code>
+     */
+    public boolean hasIsDynamic() {
+      return ((bitField0_ & 0x00002000) == 0x00002000);
+    }
+    /**
+     * <code>optional bool isDynamic = 14;</code>
+     */
+    public boolean getIsDynamic() {
+      return isDynamic_;
+    }
+
     private void initFields() {
       columnNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       familyNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -692,6 +723,7 @@ public final class PTableProtos {
       viewReferenced_ = false;
       expression_ = "";
       isRowTimestamp_ = false;
+      isDynamic_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -764,6 +796,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x00001000) == 0x00001000)) {
         output.writeBool(13, isRowTimestamp_);
       }
+      if (((bitField0_ & 0x00002000) == 0x00002000)) {
+        output.writeBool(14, isDynamic_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -825,6 +860,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(13, isRowTimestamp_);
       }
+      if (((bitField0_ & 0x00002000) == 0x00002000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(14, isDynamic_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -913,6 +952,11 @@ public final class PTableProtos {
         result = result && (getIsRowTimestamp()
             == other.getIsRowTimestamp());
       }
+      result = result && (hasIsDynamic() == other.hasIsDynamic());
+      if (hasIsDynamic()) {
+        result = result && (getIsDynamic()
+            == other.getIsDynamic());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -978,6 +1022,10 @@ public final class PTableProtos {
         hash = (37 * hash) + ISROWTIMESTAMP_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getIsRowTimestamp());
       }
+      if (hasIsDynamic()) {
+        hash = (37 * hash) + ISDYNAMIC_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getIsDynamic());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1113,6 +1161,8 @@ public final class PTableProtos {
         bitField0_ = (bitField0_ & ~0x00000800);
         isRowTimestamp_ = false;
         bitField0_ = (bitField0_ & ~0x00001000);
+        isDynamic_ = false;
+        bitField0_ = (bitField0_ & ~0x00002000);
         return this;
       }
 
@@ -1193,6 +1243,10 @@ public final class PTableProtos {
           to_bitField0_ |= 0x00001000;
         }
         result.isRowTimestamp_ = isRowTimestamp_;
+        if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+          to_bitField0_ |= 0x00002000;
+        }
+        result.isDynamic_ = isDynamic_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1252,6 +1306,9 @@ public final class PTableProtos {
         if (other.hasIsRowTimestamp()) {
           setIsRowTimestamp(other.getIsRowTimestamp());
         }
+        if (other.hasIsDynamic()) {
+          setIsDynamic(other.getIsDynamic());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1819,6 +1876,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional bool isDynamic = 14;
+      private boolean isDynamic_ ;
+      /**
+       * <code>optional bool isDynamic = 14;</code>
+       */
+      public boolean hasIsDynamic() {
+        return ((bitField0_ & 0x00002000) == 0x00002000);
+      }
+      /**
+       * <code>optional bool isDynamic = 14;</code>
+       */
+      public boolean getIsDynamic() {
+        return isDynamic_;
+      }
+      /**
+       * <code>optional bool isDynamic = 14;</code>
+       */
+      public Builder setIsDynamic(boolean value) {
+        bitField0_ |= 0x00002000;
+        isDynamic_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool isDynamic = 14;</code>
+       */
+      public Builder clearIsDynamic() {
+        bitField0_ = (bitField0_ & ~0x00002000);
+        isDynamic_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PColumn)
     }
 
@@ -6909,38 +6999,39 @@ public final class PTableProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\014PTable.proto\032\021PGuidePosts.proto\"\223\002\n\007PC" +
+      "\n\014PTable.proto\032\021PGuidePosts.proto\"\246\002\n\007PC" +
       "olumn\022\027\n\017columnNameBytes\030\001 \002(\014\022\027\n\017family" +
       "NameBytes\030\002 \001(\014\022\020\n\010dataType\030\003 \002(\t\022\021\n\tmax" +
       "Length\030\004 \001(\005\022\r\n\005scale\030\005 \001(\005\022\020\n\010nullable\030" +
       "\006 \002(\010\022\020\n\010position\030\007 \002(\005\022\021\n\tsortOrder\030\010 \002" +
       "(\005\022\021\n\tarraySize\030\t \001(\005\022\024\n\014viewConstant\030\n " +
       "\001(\014\022\026\n\016viewReferenced\030\013 \001(\010\022\022\n\nexpressio" +
-      "n\030\014 \001(\t\022\026\n\016isRowTimestamp\030\r \001(\010\"\232\001\n\013PTab" +
-      "leStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030\002 \003(\014\022\033\n\023" +
-      "guidePostsByteCount\030\003 \001(\003\022\025\n\rkeyBytesCou",
-      "nt\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 \001(\005\022!\n\013pGu" +
-      "idePosts\030\006 \001(\0132\014.PGuidePosts\"\244\005\n\006PTable\022" +
-      "\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tableNameByt" +
-      "es\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.PTableType" +
-      "\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenceNumber\030\005" +
-      " \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013pkNameBytes\030\007" +
-      " \001(\014\022\021\n\tbucketNum\030\010 \002(\005\022\031\n\007columns\030\t \003(\013" +
-      "2\010.PColumn\022\030\n\007indexes\030\n \003(\0132\007.PTable\022\027\n\017" +
-      "isImmutableRows\030\013 \002(\010\022 \n\nguidePosts\030\014 \003(" +
-      "\0132\014.PTableStats\022\032\n\022dataTableNameBytes\030\r ",
-      "\001(\014\022\031\n\021defaultFamilyName\030\016 \001(\014\022\022\n\ndisabl" +
-      "eWAL\030\017 \002(\010\022\023\n\013multiTenant\030\020 \002(\010\022\020\n\010viewT" +
-      "ype\030\021 \001(\014\022\025\n\rviewStatement\030\022 \001(\014\022\025\n\rphys" +
-      "icalNames\030\023 \003(\014\022\020\n\010tenantId\030\024 \001(\014\022\023\n\013vie" +
-      "wIndexId\030\025 \001(\005\022\021\n\tindexType\030\026 \001(\014\022\026\n\016sta" +
-      "tsTimeStamp\030\027 \001(\003\022\022\n\nstoreNulls\030\030 \001(\010\022\027\n" +
-      "\017baseColumnCount\030\031 \001(\005\022\036\n\026rowKeyOrderOpt" +
-      "imizable\030\032 \001(\010\022\025\n\rtransactional\030\033 \001(\010\022\034\n" +
-      "\024updateCacheFrequency\030\034 \001(\003*A\n\nPTableTyp" +
-      "e\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005IND",
-      "EX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.phoenix.cop" +
-      "rocessor.generatedB\014PTableProtosH\001\210\001\001\240\001\001"
+      "n\030\014 \001(\t\022\026\n\016isRowTimestamp\030\r \001(\010\022\021\n\tisDyn" +
+      "amic\030\016 \001(\010\"\232\001\n\013PTableStats\022\013\n\003key\030\001 \002(\014\022" +
+      "\016\n\006values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003",
+      " \001(\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePost" +
+      "sCount\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGui" +
+      "dePosts\"\244\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " +
+      "\002(\014\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType" +
+      "\030\003 \002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022" +
+      "\026\n\016sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002" +
+      "(\003\022\023\n\013pkNameBytes\030\007 \001(\014\022\021\n\tbucketNum\030\010 \002" +
+      "(\005\022\031\n\007columns\030\t \003(\0132\010.PColumn\022\030\n\007indexes" +
+      "\030\n \003(\0132\007.PTable\022\027\n\017isImmutableRows\030\013 \002(\010" +
+      "\022 \n\nguidePosts\030\014 \003(\0132\014.PTableStats\022\032\n\022da",
+      "taTableNameBytes\030\r \001(\014\022\031\n\021defaultFamilyN" +
+      "ame\030\016 \001(\014\022\022\n\ndisableWAL\030\017 \002(\010\022\023\n\013multiTe" +
+      "nant\030\020 \002(\010\022\020\n\010viewType\030\021 \001(\014\022\025\n\rviewStat" +
+      "ement\030\022 \001(\014\022\025\n\rphysicalNames\030\023 \003(\014\022\020\n\010te" +
+      "nantId\030\024 \001(\014\022\023\n\013viewIndexId\030\025 \001(\005\022\021\n\tind" +
+      "exType\030\026 \001(\014\022\026\n\016statsTimeStamp\030\027 \001(\003\022\022\n\n" +
+      "storeNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 \001(" +
+      "\005\022\036\n\026rowKeyOrderOptimizable\030\032 \001(\010\022\025\n\rtra" +
+      "nsactional\030\033 \001(\010\022\034\n\024updateCacheFrequency" +
+      "\030\034 \001(\003*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER",
+      "\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org" +
+      ".apache.phoenix.coprocessor.generatedB\014P" +
+      "TableProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6952,7 +7043,7 @@ public final class PTableProtos {
           internal_static_PColumn_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PColumn_descriptor,
-              new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", "IsRowTimestamp", });
+              new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", "IsRowTimestamp", "IsDynamic", });
           internal_static_PTableStats_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_PTableStats_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 0a5b053..46aa819 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -569,7 +570,7 @@ public class MutationState implements SQLCloseable {
                 List<Mutation> indexMutations;
                 try {
                     indexMutations =
-                            IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex,
+                    		IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex,
                                 connection.getKeyValueBuilder(), connection);
                     // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
                     if (!sendAll) {
@@ -719,37 +720,35 @@ public class MutationState implements SQLCloseable {
         long serverTimeStamp = tableRef.getTimeStamp();
         // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
         // so no need to do it again here.
-        if (!connection.getAutoCommit()) {
-            PTable table = tableRef.getTable();
-            MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
-            PTable resolvedTable = result.getTable();
-            if (resolvedTable == null) {
-                throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
-            }
-            // Always update tableRef table as the one we've cached may be out of date since when we executed
-            // the UPSERT VALUES call and updated in the cache before this.
-            tableRef.setTable(resolvedTable);
-            long timestamp = result.getMutationTime();
-            if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
-                serverTimeStamp = timestamp;
-                if (result.wasUpdated()) {
-                    // TODO: use bitset?
-                    PColumn[] columns = new PColumn[resolvedTable.getColumns().size()];
-                    for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
-                        RowMutationState valueEntry = rowEntry.getValue();
-                        if (valueEntry != null) {
-                            Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
-                            if (colValues != PRow.DELETE_MARKER) {
-                                for (PColumn column : colValues.keySet()) {
-                                    columns[column.getPosition()] = column;
-                                }
+        PTable table = tableRef.getTable();
+        MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+        PTable resolvedTable = result.getTable();
+        if (resolvedTable == null) {
+            throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
+        }
+        // Always update tableRef table as the one we've cached may be out of date since when we executed
+        // the UPSERT VALUES call and updated in the cache before this.
+        tableRef.setTable(resolvedTable);
+        long timestamp = result.getMutationTime();
+        if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
+            serverTimeStamp = timestamp;
+            if (result.wasUpdated()) {
+                List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
+                for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
+                    RowMutationState valueEntry = rowEntry.getValue();
+                    if (valueEntry != null) {
+                        Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+                        if (colValues != PRow.DELETE_MARKER) {
+                            for (PColumn column : colValues.keySet()) {
+                            	if (!column.isDynamic())
+                            		columns.add(column);
                             }
                         }
                     }
-                    for (PColumn column : columns) {
-                        if (column != null) {
-                            resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
-                        }
+                }
+                for (PColumn column : columns) {
+                    if (column != null) {
+                        resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index d40a15b..b0e7b6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -205,6 +205,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String HCONNECTION_POOL_MAX_SIZE = "hbase.hconnection.threads.max";
     public static final String HTABLE_MAX_THREADS = "hbase.htable.threads.max";
 
+    // time to wait before running second index population upsert select (so that any pending batches of rows on region server are also written to index)
+    public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time";
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 9257413..c9bc19b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -37,6 +37,7 @@ import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.INDEX_POPULATION_SLEEP_TIME;
 import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB;
@@ -223,6 +224,8 @@ public class QueryServicesOptions {
     
     public static final boolean DEFAULT_RETURN_SEQUENCE_VALUES = false;
     public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = "";
+    
+    public static final long DEFAULT_INDEX_POPULATION_SLEEP_TIME = 5000;
 
     // QueryServer defaults -- ensure ThinClientUtil is also updated since phoenix-server-client
     // doesn't depend on phoenix-core.
@@ -430,7 +433,6 @@ public class QueryServicesOptions {
         return set(GROUPBY_SPILL_FILES_ATTRIB, num);
     }
 
-
     private QueryServicesOptions set(String name, boolean value) {
         config.set(name, Boolean.toString(value));
         return this;
@@ -641,4 +643,9 @@ public class QueryServicesOptions {
         return this;
     }
     
+    public QueryServicesOptions setDefaultIndexPopulationWaitTime(long waitTime) {
+        config.setLong(INDEX_POPULATION_SLEEP_TIME, waitTime);
+        return this;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index ddb0a1a..798706e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -84,4 +84,9 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
     public String toString() {
         return getDelegate().toString();
     }
+
+	@Override
+	public boolean isDynamic() {
+		return getDelegate().isDynamic();
+	}
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index d559842..e8d995c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -114,9 +114,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import co.cask.tephra.TxConstants;
-
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -125,17 +122,15 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.compile.BaseMutationPlan;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexExpressionCompiler;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.PostDDLCompiler;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
-import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.PostLocalIndexDDLCompiler;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementNormalizer;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -153,7 +148,6 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AlterIndexStatement;
 import org.apache.phoenix.parse.ColumnDef;
@@ -210,6 +204,8 @@ import org.apache.phoenix.util.UpgradeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import co.cask.tephra.TxConstants;
+
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
@@ -821,7 +817,7 @@ public class MetaDataClient {
             }
 
             PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
-                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp);
+                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false);
             return column;
         } catch (IllegalArgumentException e) { // Based on precondition check in constructor
             throw new SQLException(e);
@@ -1021,89 +1017,68 @@ public class MetaDataClient {
         try {
             connection.setAutoCommit(true);
             MutationPlan mutationPlan;
-
-            // For local indexes, we optimize the initial index population by *not* sending Puts over
-            // the wire for the index rows, as we don't need to do that. Instead, we tap into our
-            // region observer to generate the index rows based on the data rows as we scan
             if (index.getIndexType() == IndexType.LOCAL) {
-                try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
-                    String tableName = getFullTableName(dataTableRef);
-                    String query = "SELECT count(*) FROM " + tableName;
-                    final QueryPlan plan = statement.compileQuery(query);
-                    TableRef tableRef = plan.getTableRef();
-                    // Set attribute on scan that UngroupedAggregateRegionObserver will switch on.
-                    // We'll detect that this attribute was set the server-side and write the index
-                    // rows per region as a result. The value of the attribute will be our persisted
-                    // index maintainers.
-                    // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver
-                    Scan scan = plan.getContext().getScan();
-                    try {
-                        if(ScanUtil.isDefaultTimeRange(scan.getTimeRange())) {
-                            Long scn = connection.getSCN();
-                            if (scn == null) {
-                                scn = plan.getContext().getCurrentTime();
-                            }
-                            scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(),scn);
-                        }
-                    } catch (IOException e) {
-                        throw new SQLException(e);
-                    }
-                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                    final PTable dataTable = tableRef.getTable();
-                    for(PTable idx: dataTable.getIndexes()) {
-                        if(idx.getName().equals(index.getName())) {
-                            index = idx;
-                            break;
-                        }
-                    }
-                    List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
-                    // Only build newly created index.
-                    indexes.add(index);
-                    IndexMaintainer.serialize(dataTable, ptr, indexes, plan.getContext().getConnection());
-                    scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr));
-                    // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
-                    // However, in this case, we need to project all of the data columns that contribute to the index.
-                    IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
-                    for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
-                        scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
-                    }
-
-                    // Go through MutationPlan abstraction so that we can create local indexes
-                    // with a connectionless connection (which makes testing easier).
-                    mutationPlan = new BaseMutationPlan(plan.getContext(), Operation.UPSERT) {
-
-                        @Override
-                        public MutationState execute() throws SQLException {
-                            connection.getMutationState().commitDDLFence(dataTable);
-                            Cell kv = plan.iterator().next().getValue(0);
-                            ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-                            // A single Cell will be returned with the count(*) - we decode that here
-                            long rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
-                            // The contract is to return a MutationState that contains the number of rows modified. In this
-                            // case, it's the number of rows in the data table which corresponds to the number of index
-                            // rows that were added.
-                            return new MutationState(0, connection, rowCount);
-                        }
-
-                    };
-                }
+                PostLocalIndexDDLCompiler compiler =
+                        new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
+                mutationPlan = compiler.compile(index);
             } else {
                 PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
                 mutationPlan = compiler.compile(index);
-                try {
-                    Long scn = connection.getSCN();
+            }
+            Scan scan = mutationPlan.getContext().getScan();
+            Long scn = connection.getSCN();
+            try {
+                if (ScanUtil.isDefaultTimeRange(scan.getTimeRange())) {
                     if (scn == null) {
                         scn = mutationPlan.getContext().getCurrentTime();
                     }
-                    mutationPlan.getContext().getScan().setTimeRange(dataTableRef.getLowerBoundTimeStamp(), scn);
+                    scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(), scn);
+                }
+            } catch (IOException e) {
+                throw new SQLException(e);
+            }
+            
+            // execute index population upsert select
+            long startTime = System.currentTimeMillis();
+            MutationState state = connection.getQueryServices().updateData(mutationPlan);
+            long firstUpsertSelectTime = System.currentTimeMillis() - startTime;
+
+            // for global indexes on non transactional tables we might have to
+            // run a second index population upsert select to handle data rows
+            // that were being written on the server while the index was created
+            long sleepTime =
+                    connection
+                            .getQueryServices()
+                            .getProps()
+                            .getLong(QueryServices.INDEX_POPULATION_SLEEP_TIME,
+                                QueryServicesOptions.DEFAULT_INDEX_POPULATION_SLEEP_TIME);
+            if (!dataTableRef.getTable().isTransactional() && sleepTime > 0) {
+                long delta = sleepTime - firstUpsertSelectTime;
+                if (delta > 0) {
+                    try {
+                        Thread.sleep(delta);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
+                                .setRootCause(e).build().buildException();
+                    }
+                }
+                // set the min timestamp of second index upsert select some time before the index
+                // was created
+                long minTimestamp = index.getTimeStamp() - firstUpsertSelectTime;
+                try {
+                    mutationPlan.getContext().getScan().setTimeRange(minTimestamp, scn);
                 } catch (IOException e) {
                     throw new SQLException(e);
                 }
+                MutationState newMutationState =
+                        connection.getQueryServices().updateData(mutationPlan);
+                state.join(newMutationState);
             }
-            MutationState state = connection.getQueryServices().updateData(mutationPlan);
+            
             indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
-                TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
-                dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
+            		TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
+            		dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
             alterIndex(indexStatement);
 
             return state;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
index 357ce6f..0f5fa44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
@@ -58,4 +58,6 @@ public interface PColumn extends PDatum {
      * @return whether this column represents/stores the hbase cell timestamp.
      */
     boolean isRowTimestamp();
+    
+    boolean isDynamic();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index cff276b..a556f76 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -39,6 +39,7 @@ public class PColumnImpl implements PColumn {
     private boolean isViewReferenced;
     private String expressionStr;
     private boolean isRowTimestamp;
+    private boolean isDynamic;
     
     public PColumnImpl() {
     }
@@ -50,13 +51,13 @@ public class PColumnImpl implements PColumn {
                        Integer scale,
                        boolean nullable,
                        int position,
-                       SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp) {
-        init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp);
+                       SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) {
+        init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic);
     }
 
     public PColumnImpl(PColumn column, int position) {
         this(column.getName(), column.getFamilyName(), column.getDataType(), column.getMaxLength(),
-                column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp());
+                column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic());
     }
 
     private void init(PName name,
@@ -68,7 +69,7 @@ public class PColumnImpl implements PColumn {
             int position,
             SortOrder sortOrder,
             Integer arrSize,
-            byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp) {
+            byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) {
     	Preconditions.checkNotNull(sortOrder);
         this.dataType = dataType;
         if (familyName == null) {
@@ -92,6 +93,7 @@ public class PColumnImpl implements PColumn {
         this.isViewReferenced = isViewReferenced;
         this.expressionStr = expressionStr;
         this.isRowTimestamp = isRowTimestamp;
+        this.isDynamic = isDynamic;
     }
 
     @Override
@@ -198,6 +200,11 @@ public class PColumnImpl implements PColumn {
     public boolean isRowTimestamp() {
         return isRowTimestamp;
     }
+    
+    @Override
+    public boolean isDynamic() {
+        return isDynamic;
+    }
 
     /**
      * Create a PColumn instance from PBed PColumn instance
@@ -240,8 +247,12 @@ public class PColumnImpl implements PColumn {
 	        expressionStr = column.getExpression();
         }
         boolean isRowTimestamp = column.getIsRowTimestamp();
+        boolean isDynamic = false;
+        if (column.hasIsDynamic()) {
+        	isDynamic = column.getIsDynamic();
+        }
         return new PColumnImpl(columnName, familyName, dataType, maxLength, scale, nullable, position, sortOrder,
-                arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp);
+                arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic);
     }
 
     public static PTableProtos.PColumn toProto(PColumn column) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 66b4af3..413d116 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -414,7 +414,7 @@ public class PMetaDataImpl implements PMetaData {
             // Update position of columns that follow removed column
             for (int i = position+1; i < oldColumns.size(); i++) {
                 PColumn oldColumn = oldColumns.get(i);
-                PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp());
+                PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp(), oldColumn.isDynamic());
                 columns.add(newColumn);
             }
             

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
index 4ac54cb..734a9ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
@@ -38,7 +38,7 @@ public class SaltingUtil {
     public static final String SALTING_COLUMN_NAME = "_SALT";
     public static final String SALTED_ROW_KEY_NAME = "_SALTED_KEY";
     public static final PColumnImpl SALTING_COLUMN = new PColumnImpl(
-            PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false);
+            PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false);
     public static final RowKeySchema VAR_BINARY_SALTED_SCHEMA = new RowKeySchemaBuilder(2)
         .addField(SALTING_COLUMN, false, SortOrder.getDefault())
         .addField(SchemaUtil.VAR_BINARY_DATUM, false, SortOrder.getDefault()).build();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index 72f3e01..6b89187 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -226,7 +226,7 @@ public class CorrelatePlanTest {
             Expression expr = LiteralExpression.newConstant(row[i]);
             columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
                     expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
-                    i, expr.getSortOrder(), null, null, false, name, false));
+                    i, expr.getSortOrder(), null, null, false, name, false, false));
         }
         try {
             PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a138cfe0/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
index d508707..8b2b096 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
@@ -116,8 +116,8 @@ public class UnnestArrayPlanTest {
         LiteralExpression dummy = LiteralExpression.newConstant(null, arrayType);
         RowKeyValueAccessor accessor = new RowKeyValueAccessor(Arrays.asList(dummy), 0);
         UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new RowKeyColumnExpression(dummy, accessor), withOrdinality);
-        PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false);
-        PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false) : null;
+        PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false);
+        PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false) : null;
         List<PColumn> columns = withOrdinality ? Arrays.asList(elemColumn, indexColumn) : Arrays.asList(elemColumn);
         ProjectedColumnExpression elemExpr = new ProjectedColumnExpression(elemColumn, columns, 0, elemColumn.getName().getString());
         ProjectedColumnExpression indexExpr = withOrdinality ? new ProjectedColumnExpression(indexColumn, columns, 1, indexColumn.getName().getString()) : null;