You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/02/19 23:36:58 UTC

[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5018 Index mutations created by UPSERT SELECT will have wrong timestamps

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new 190828b  PHOENIX-5018 Index mutations created by UPSERT SELECT will have wrong timestamps
190828b is described below

commit 190828bf66679d2768ffd1a5713023e8b6a5ec12
Author: Kadir <ko...@salesforce.com>
AuthorDate: Tue Jan 29 17:14:02 2019 -0800

    PHOENIX-5018 Index mutations created by UPSERT SELECT will have wrong timestamps
    
    Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
---
 .../phoenix/end2end/IndexBuildTimestampIT.java     | 248 +++++++++++++++++++++
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  36 +++
 .../phoenix/end2end/TableDDLPermissionsIT.java     |   8 -
 .../org/apache/phoenix/rpc/PhoenixServerRpcIT.java |   6 -
 .../phoenix/compile/ServerBuildIndexCompiler.java  | 138 ++++++++++++
 .../org/apache/phoenix/index/IndexMaintainer.java  |   3 +-
 .../phoenix/mapreduce/PhoenixInputFormat.java      |   4 +-
 .../phoenix/mapreduce/PhoenixRecordReader.java     |   4 +-
 .../PhoenixServerBuildIndexInputFormat.java        | 111 +++++++++
 .../apache/phoenix/mapreduce/index/IndexTool.java  | 243 ++++++++++++--------
 .../index/PhoenixServerBuildIndexMapper.java       |  75 +++++++
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  25 +++
 .../mapreduce/util/PhoenixMapReduceUtil.java       |  27 +++
 .../org/apache/phoenix/schema/MetaDataClient.java  |  16 +-
 14 files changed, 820 insertions(+), 124 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
new file mode 100644
index 0000000..50be0b8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class IndexBuildTimestampIT extends BaseUniqueNamesOwnClusterIT {
+    private final boolean localIndex;
+    private final boolean async;
+    private final boolean view;
+    private final String tableDDLOptions;
+
+    public IndexBuildTimestampIT(boolean mutable, boolean localIndex,
+                            boolean async, boolean view) {
+        this.localIndex = localIndex;
+        this.async = async;
+        this.view = view;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (!mutable) {
+            optionBuilder.append(" IMMUTABLE_ROWS=true ");
+        }
+        optionBuilder.append(" SPLIT ON(1,2)");
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        IndexToolIT.setup();
+    }
+
+    @Parameters(
+            name = "mutable={0},localIndex={1},async={2},view={3}")
+    public static Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(8);
+        boolean[] Booleans = new boolean[]{false, true};
+        for (boolean mutable : Booleans) {
+            for (boolean localIndex : Booleans) {
+                for (boolean async : Booleans) {
+                    for (boolean view : Booleans) {
+                        list.add(new Object[]{mutable, localIndex, async, view});
+                    }
+                }
+            }
+        }
+        return list;
+    }
+
+    public static void assertExplainPlan(Connection conn, boolean localIndex, String selectSql,
+                                         String dataTableFullName, String indexTableFullName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+        String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+
+        IndexToolIT.assertExplainPlan(localIndex, actualExplainPlan, dataTableFullName, indexTableFullName);
+    }
+
+    private class MyClock extends EnvironmentEdge {
+        long initialTime;
+        long delta;
+
+        public MyClock(long delta) {
+            initialTime = System.currentTimeMillis() + delta;
+            this.delta = delta;
+        }
+
+        @Override
+        public long currentTime() {
+            return System.currentTimeMillis() + delta;
+        }
+
+        public long initialTime() {
+            return initialTime;
+        }
+    }
+
+    private void populateTable(String tableName, MyClock clock1, MyClock clock2) throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("create table " + tableName +
+                " (id varchar(10) not null primary key, val varchar(10), ts timestamp)" + tableDDLOptions);
+
+        EnvironmentEdgeManager.injectEdge(clock1);
+        conn.createStatement().execute("upsert into " + tableName + " values ('aaa', 'abc', current_date())");
+        conn.commit();
+
+        EnvironmentEdgeManager.injectEdge(clock2);
+        conn.createStatement().execute("upsert into " + tableName + " values ('bbb', 'bcd', current_date())");
+        conn.commit();
+        conn.close();
+
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(clock1.initialTime()));
+        conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName);
+        assertFalse(rs.next());
+        conn.close();
+
+        props.setProperty("CurrentSCN", Long.toString(clock2.initialTime()));
+        conn = DriverManager.getConnection(getUrl(), props);
+        rs = conn.createStatement().executeQuery("select * from " + tableName);
+
+        assertTrue(rs.next());
+        assertEquals("aaa", rs.getString(1));
+        assertEquals("abc", rs.getString(2));
+        assertNotNull(rs.getDate(3));
+
+        assertFalse(rs.next());
+        conn.close();
+
+        props.setProperty("CurrentSCN", Long.toString(clock2.currentTime()));
+        conn = DriverManager.getConnection(getUrl(), props);
+        rs = conn.createStatement().executeQuery("select * from " + tableName);
+
+        assertTrue(rs.next());
+        assertEquals("aaa", rs.getString(1));
+        assertEquals("abc", rs.getString(2));
+        assertNotNull(rs.getDate(3));
+
+        assertTrue(rs.next());
+        assertEquals("bbb", rs.getString(1));
+        assertEquals("bcd", rs.getString(2));
+        assertNotNull(rs.getDate(3));
+        assertFalse(rs.next());
+        conn.close();
+    }
+
+    @Test
+    public void testCellTimestamp() throws Exception {
+        EnvironmentEdgeManager.reset();
+        MyClock clock1 = new MyClock(100000);
+        MyClock clock2 = new MyClock(200000);
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName, clock1, clock2);
+
+        MyClock clock3 = new MyClock(300000);
+        EnvironmentEdgeManager.injectEdge(clock3);
+
+        Properties props = new Properties();
+        props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, "true");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+        String viewName = null;
+        if (view) {
+            viewName = generateUniqueName();
+            conn.createStatement().execute("CREATE VIEW "+ viewName + " AS SELECT * FROM " +
+                    dataTableName);
+        }
+        String indexName = generateUniqueName();
+        conn.createStatement().execute("CREATE "+ (localIndex ? "LOCAL " : "") + " INDEX " + indexName + " on " +
+                (view ? viewName : dataTableName) + " (val) include (ts)" + (async ? "ASYNC" : ""));
+
+        conn.close();
+
+        if (async) {
+            // run the index MR job.
+            IndexToolIT.runIndexTool(true, false, null, (view ? viewName : dataTableName), indexName);
+        }
+
+        // Verify the index timestamps via Phoenix
+        String selectSql = String.format("SELECT * FROM %s WHERE val = 'abc'", (view ? viewName : dataTableName));
+        conn = DriverManager.getConnection(getUrl());
+        // assert we are pulling from index table
+        assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? "_IDX_" + dataTableName : indexName));
+        ResultSet rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue (rs.next());
+        assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() < clock2.initialTime() &&
+                rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= clock1.initialTime());
+
+        selectSql =
+                String.format("SELECT * FROM %s WHERE val = 'bcd'", (view ? viewName : dataTableName));
+        // assert we are pulling from index table
+        assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? "_IDX_" + dataTableName : indexName));
+
+        rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue (rs.next());
+        assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() < clock3.initialTime() &&
+                        rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= clock2.initialTime()
+                );
+        assertFalse (rs.next());
+
+        // Verify the index timestamps via HBase
+        PTable pIndexTable = PhoenixRuntime.getTable(conn, indexName);
+        Table table = conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getTable(pIndexTable.getPhysicalName().getBytes());
+
+        Scan scan = new Scan();
+        scan.setTimeRange(clock3.initialTime(), clock3.currentTime());
+        ResultScanner scanner = table.getScanner(scan);
+        assertTrue(scanner.next() == null);
+
+
+        scan = new Scan();
+        scan.setTimeRange(clock2.initialTime(), clock3.initialTime());
+        scanner = table.getScanner(scan);
+        assertTrue(scanner.next() != null);
+
+
+        scan = new Scan();
+        scan.setTimeRange(clock1.initialTime(), clock2.initialTime());
+        scanner = table.getScanner(scan);
+        assertTrue(scanner.next() != null);
+        conn.close();
+        EnvironmentEdgeManager.reset();
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index c185f39..d25cf6f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -47,11 +47,17 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
+
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -498,6 +504,32 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, new String[0]);
     }
 
+    private static void verifyMapper(Job job, boolean directApi, boolean useSnapshot, String schemaName,
+                                  String dataTableName, String indexTableName, String tenantId) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        if (tenantId != null) {
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        try (Connection conn =
+                     DriverManager.getConnection(getUrl(), props)) {
+            PTable dataTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, dataTableName));
+            PTable indexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indexTableName));
+            boolean transactional = dataTable.isTransactional();
+            boolean localIndex = PTable.IndexType.LOCAL.equals(indexTable.getIndexType());
+
+            if (directApi) {
+                if ((localIndex || !transactional) && !useSnapshot) {
+                    assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class);
+                } else {
+                    assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class);
+                }
+            }
+            else {
+                assertEquals(job.getMapperClass(), PhoenixIndexImportMapper.class);
+            }
+        }
+    }
+
     public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
             String dataTableName, String indexTableName, String... additionalArgs) throws Exception {
         runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs);
@@ -515,6 +547,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
         cmdArgList.addAll(Arrays.asList(additionalArgs));
         int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
+
+        if (expectedStatus == 0) {
+            verifyMapper(indexingTool.getJob(), directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
+        }
         assertEquals(expectedStatus, status);
     }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
index 86a6b60..d29056d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
@@ -204,14 +204,7 @@ public class TableDDLPermissionsIT extends BasePermissionsIT {
 
             // we should be able to read the data from another index as well to which we have not given any access to
             // this user
-            verifyAllowed(createIndex(indexName2, phoenixTableName), unprivilegedUser);
             verifyAllowed(readTable(phoenixTableName, indexName1), unprivilegedUser);
-            verifyAllowed(readTable(phoenixTableName, indexName2), unprivilegedUser);
-            verifyAllowed(rebuildIndex(indexName2, phoenixTableName), unprivilegedUser);
-
-            // data table user should be able to read new index
-            verifyAllowed(rebuildIndex(indexName2, phoenixTableName), regularUser1);
-            verifyAllowed(readTable(phoenixTableName, indexName2), regularUser1);
 
             verifyAllowed(readTable(phoenixTableName), regularUser1);
             verifyAllowed(rebuildIndex(indexName1, phoenixTableName), regularUser1);
@@ -220,7 +213,6 @@ public class TableDDLPermissionsIT extends BasePermissionsIT {
             verifyAllowed(dropView(viewName1), regularUser1);
             verifyAllowed(dropView(viewName2), regularUser1);
             verifyAllowed(dropColumn(phoenixTableName, "val1"), regularUser1);
-            verifyAllowed(dropIndex(indexName2, phoenixTableName), regularUser1);
             verifyAllowed(dropIndex(indexName1, phoenixTableName), regularUser1);
             verifyAllowed(dropTable(phoenixTableName), regularUser1);
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index 1c18667..ab05c16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -140,12 +140,6 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
             assertEquals("k1", rs.getString(1));
             assertEquals("v2", rs.getString(2));
             assertFalse(rs.next());
-            
-            TestPhoenixIndexRpcSchedulerFactory.reset();
-            createIndex(conn, indexName + "_1");
-            // Verify that that index queue is not used since running upsert select on server side has been disabled
-            // See PHOENIX-4171
-            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.never()).dispatch(Mockito.any(CallRunner.class));
         }
         finally {
             conn.close();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
new file mode 100644
index 0000000..7d1c1b4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.StringUtil;
+
+import com.google.common.collect.Lists;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+
+/**
+ * Class that compiles plan to generate initial data values after a DDL command for
+ * index table.
+ */
+public class ServerBuildIndexCompiler {
+    private final PhoenixConnection connection;
+    private final String tableName;
+    private PTable dataTable;
+    private QueryPlan plan;
+
+    private class RowCountMutationPlan extends BaseMutationPlan {
+        private RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) {
+            super(context, operation);
+        }
+        @Override
+        public MutationState execute() throws SQLException {
+            connection.getMutationState().commitDDLFence(dataTable);
+            Tuple tuple = plan.iterator().next();
+            long rowCount = 0;
+            if (tuple != null) {
+                Cell kv = tuple.getValue(0);
+                ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+                // A single Cell will be returned with the count(*) - we decode that here
+                rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+            }
+            // The contract is to return a MutationState that contains the number of rows modified. In this
+            // case, it's the number of rows in the data table which corresponds to the number of index
+            // rows that were added.
+            return new MutationState(0, 0, connection, rowCount);
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return plan;
+        }
+    };
+    
+    public ServerBuildIndexCompiler(PhoenixConnection connection, String tableName) {
+        this.connection = connection;
+        this.tableName = tableName;
+    }
+
+    public MutationPlan compile(PTable index) throws SQLException {
+        try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
+            String query = "SELECT count(*) FROM " + tableName;
+            this.plan = statement.compileQuery(query);
+            TableRef tableRef = plan.getTableRef();
+            Scan scan = plan.getContext().getScan();
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            dataTable = tableRef.getTable();
+            if (index.getIndexType() == PTable.IndexType.GLOBAL &&  dataTable.isTransactional()) {
+                throw new IllegalArgumentException(
+                        "ServerBuildIndexCompiler does not support global indexes on transactional tables");
+            }
+            IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(index), plan.getContext().getConnection());
+            // Set the scan attributes that UngroupedAggregateRegionObserver will switch on.
+            // For local indexes, the BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
+            // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute is set to the serialized form of index
+            // metadata to build index rows from data table rows. For global indexes, we also need to set (1) the
+            // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to signal UngroupedAggregateRegionObserver
+            // that this scan is for building global indexes and (2) the MetaDataProtocol.PHOENIX_VERSION attribute
+            // that will be passed as a mutation attribute for the scanned mutations that will be applied on
+            // the index table possibly remotely
+            if (index.getIndexType() == PTable.IndexType.LOCAL) {
+                scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr));
+            } else {
+                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+                scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+                ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+            }
+            // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+            // However, in this case, we need to project all of the data columns that contribute to the index.
+            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
+            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+                if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                    scan.addFamily(columnRef.getFamily());
+                } else {
+                    scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+                }
+            }
+
+            if (dataTable.isTransactional()) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
+            }
+
+            // Go through MutationPlan abstraction so that we can create local indexes
+            // with a connectionless connection (which makes testing easier).
+            return new RowCountMutationPlan(plan.getContext(), PhoenixStatement.Operation.UPSERT);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index d3d14d8..cb09dc4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -964,8 +964,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
                 this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts,
-                // set the value to the empty column name
-                dataEmptyKeyValueRef.getQualifierWritable()));
+                    QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
         }
         ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index c815119..136548e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -74,7 +74,6 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
     @Override
     public RecordReader<NullWritable,T> createRecordReader(InputSplit split, TaskAttemptContext context)
             throws IOException, InterruptedException {
-        
         final Configuration configuration = context.getConfiguration();
         final QueryPlan queryPlan = getQueryPlan(context,configuration);
         @SuppressWarnings("unchecked")
@@ -164,7 +163,8 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
      * @throws IOException
      * @throws SQLException
      */
-    private QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) {
+    protected  QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
+            throws IOException {
         Preconditions.checkNotNull(context);
         try {
             final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index add8c31..3c4db8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -61,8 +61,8 @@ import com.google.common.collect.Lists;
 public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<NullWritable,T> {
     
     private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
-    private final Configuration  configuration;
-    private final QueryPlan queryPlan;
+    protected final Configuration  configuration;
+    protected final QueryPlan queryPlan;
     private NullWritable key =  NullWritable.get();
     private T value = null;
     private Class<T> inputClass;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
new file mode 100644
index 0000000..f8ec393
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.*;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.*;
+import org.apache.phoenix.util.*;
+
+import com.google.common.base.Preconditions;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+
+/**
+ * {@link InputFormat} implementation from Phoenix for building index
+ * 
+ */
+public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends PhoenixInputFormat {
+    QueryPlan queryPlan = null;
+
+    private static final Log LOG = LogFactory.getLog(PhoenixServerBuildIndexInputFormat.class);
+
+    /**
+     * instantiated by framework
+     */
+    public PhoenixServerBuildIndexInputFormat() {
+    }
+
+    @Override
+    protected  QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
+            throws IOException {
+        Preconditions.checkNotNull(context);
+        if (queryPlan != null) {
+            return queryPlan;
+        }
+        final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+        final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        final Properties overridingProps = new Properties();
+        if(txnScnValue==null && currentScnValue!=null) {
+            overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
+            overridingProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, currentScnValue);
+        }
+        if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+            overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        String dataTableFullName = getIndexToolDataTableName(configuration);
+        String indexTableFullName = getIndexToolIndexTableName(configuration);
+
+        try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
+            PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
+            Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
+            PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
+            ServerBuildIndexCompiler compiler =
+                    new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
+            MutationPlan plan = compiler.compile(indexTable);
+            Scan scan = plan.getContext().getScan();
+
+            try {
+                scan.setTimeRange(0, scn);
+            } catch (IOException e) {
+                throw new SQLException(e);
+            }
+            queryPlan = plan.getQueryPlan();
+            // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
+            if (txnScnValue != null) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
+            }
+
+            // Initialize the query plan so it sets up the parallel scans
+            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+            return queryPlan;
+        } catch (Exception exception) {
+            LOG.error(String.format("Failed to get the query plan with error [%s]",
+                    exception.getMessage()));
+            throw new RuntimeException(exception);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index dc361c9..d1d6ca2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -28,7 +28,10 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -74,6 +77,7 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
 import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames;
 import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -106,6 +110,22 @@ public class IndexTool extends Configured implements Tool {
 
     private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class);
 
+    private String schemaName;
+    private String dataTable;
+    private String indexTable;
+    private boolean isPartialBuild;
+    private String qDataTable;
+    private String qIndexTable;
+    private boolean useDirectApi;
+    private boolean useSnapshot;
+    private boolean isLocalIndexBuild;
+    private PTable pIndexTable;
+    private PTable pDataTable;
+    private String tenantId;
+    private Job job;
+
+
+
     private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true,
             "Phoenix schema name (optional)");
     private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true,
@@ -247,18 +267,31 @@ public class IndexTool extends Configured implements Tool {
 
         }
 
-        public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild,
-            boolean useSnapshot, String tenantId) throws Exception {
+        public Job getJob() throws Exception {
             if (isPartialBuild) {
-                return configureJobForPartialBuild(schemaName, dataTable, tenantId);
+                return configureJobForPartialBuild();
             } else {
-                return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot, tenantId);
+                long maxTimeRange = pIndexTable.getTimeStamp() + 1;
+                // this is set to ensure index tables remains consistent post population.
+
+                if (pDataTable.isTransactional()) {
+                    configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
+                            Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
+                    configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
+                }
+                configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
+                        Long.toString(maxTimeRange));
+                if (useSnapshot || !useDirectApi || (!isLocalIndexBuild && pDataTable.isTransactional())) {
+                    return configureJobForAysncIndex();
+                }
+                else {
+                    //Local and non-transactional global indexes to be built on the server side
+                    return configureJobForServerBuildIndex();
+                }
             }
         }
-        
-        private Job configureJobForPartialBuild(String schemaName, String dataTable, String tenantId) throws Exception {
-            final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
+
+        private Job configureJobForPartialBuild() throws Exception {
             connection = ConnectionUtil.getInputConnection(configuration);
             long minDisableTimestamp = HConstants.LATEST_TIMESTAMP;
             PTable indexWithMinDisableTimestamp = null;
@@ -266,7 +299,7 @@ public class IndexTool extends Configured implements Tool {
             //Get Indexes in building state, minDisabledTimestamp 
             List<String> disableIndexes = new ArrayList<String>();
             List<PTable> disabledPIndexes = new ArrayList<PTable>();
-            for (PTable index : pdataTable.getIndexes()) {
+            for (PTable index : pDataTable.getIndexes()) {
                 if (index.getIndexState().equals(PIndexState.BUILDING)) {
                     disableIndexes.add(index.getTableName().getString());
                     disabledPIndexes.add(index);
@@ -299,10 +332,10 @@ public class IndexTool extends Configured implements Tool {
             //serialize index maintaienr in job conf with Base64 TODO: Need to find better way to serialize them in conf.
             List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(disabledPIndexes.size());
             for (PTable index : disabledPIndexes) {
-                maintainers.add(index.getIndexMaintainer(pdataTable, connection.unwrap(PhoenixConnection.class)));
+                maintainers.add(index.getIndexMaintainer(pDataTable, connection.unwrap(PhoenixConnection.class)));
             }
             ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
-            IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
+            IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
             PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr);
             if (tenantId != null) {
                 PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
@@ -313,15 +346,15 @@ public class IndexTool extends Configured implements Tool {
             scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
             scan.setRaw(true);
             scan.setCacheBlocks(false);
-            if (pdataTable.isTransactional()) {
-                long maxTimeRange = pdataTable.getTimeStamp() + 1;
+            if (pDataTable.isTransactional()) {
+                long maxTimeRange = pDataTable.getTimeStamp() + 1;
                 scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
                         Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)))));
             }
             
           
-            String physicalTableName=pdataTable.getPhysicalName().getString();
-            final String jobName = String.format("Phoenix Indexes build for " + pdataTable.getName().toString());
+            String physicalTableName=pDataTable.getPhysicalName().getString();
+            final String jobName = String.format("Phoenix Indexes build for " + pDataTable.getName().toString());
             
             PhoenixConfigurationUtil.setInputTableName(configuration, qDataTable);
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalTableName);
@@ -338,7 +371,7 @@ public class IndexTool extends Configured implements Tool {
                     null, job);
             TableMapReduceUtil.initCredentials(job);
             TableInputFormat.configureSplitTable(job, TableName.valueOf(physicalTableName));
-            return configureSubmittableJobUsingDirectApi(job, true);
+            return configureSubmittableJobUsingDirectApi(job);
         }
         
         private long getMaxRebuildAsyncDate(String schemaName, List<String> disableIndexes) throws SQLException {
@@ -368,40 +401,15 @@ public class IndexTool extends Configured implements Tool {
             
         }
 
-        private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String tenantId)
-                throws Exception {
-            final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            final String qIndexTable;
-            if (schemaName != null && !schemaName.isEmpty()) {
-                qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
-            } else {
-                qIndexTable = indexTable;
-            }
-            final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
-            
-            final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
-            
-            long maxTimeRange = pindexTable.getTimeStamp() + 1;
-            // this is set to ensure index tables remains consistent post population.
+        private Job configureJobForAysncIndex()
 
-            if (pdataTable.isTransactional()) {
-                configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
-                    Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
-                configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pdataTable.getTransactionProvider().name());
-            }
-            configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
-                Long.toString(maxTimeRange));
-            
-            // check if the index type is LOCAL, if so, derive and set the physicalIndexName that is
-            // computed from the qDataTable name.
-            String physicalIndexTable = pindexTable.getPhysicalName().getString();
-            
+                throws Exception {
 
+            String physicalIndexTable = pIndexTable.getPhysicalName().getString();
             final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
             final PostIndexDDLCompiler ddlCompiler =
-                    new PostIndexDDLCompiler(pConnection, new TableRef(pdataTable));
-            ddlCompiler.compile(pindexTable);
-
+                    new PostIndexDDLCompiler(pConnection, new TableRef(pDataTable));
+            ddlCompiler.compile(pIndexTable);
             final List<String> indexColumns = ddlCompiler.getIndexColumnNames();
             final String selectQuery = ddlCompiler.getSelectQuery();
             final String upsertQuery =
@@ -410,6 +418,7 @@ public class IndexTool extends Configured implements Tool {
             configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
             PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable);
+
             PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
                 indexColumns.toArray(new String[indexColumns.size()]));
             if (tenantId != null) {
@@ -419,25 +428,22 @@ public class IndexTool extends Configured implements Tool {
                     PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
             ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
 
+            fs = outputPath.getFileSystem(configuration);
+            fs.delete(outputPath, true);
             final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
             final Job job = Job.getInstance(configuration, jobName);
             job.setJarByClass(IndexTool.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-            if (outputPath != null) {
-                fs = outputPath.getFileSystem(configuration);
-                fs.delete(outputPath, true);
-                FileOutputFormat.setOutputPath(job, outputPath);
-            }
+            FileOutputFormat.setOutputPath(job, outputPath);
 
             if (!useSnapshot) {
-                PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable,
-                    selectQuery);
+                PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, selectQuery);
             } else {
                 HBaseAdmin admin = null;
                 String snapshotName;
                 try {
                     admin = pConnection.getQueryServices().getAdmin();
-                    String pdataTableName = pdataTable.getName().getString();
+                    String pdataTableName = pDataTable.getName().getString();
                     snapshotName = new StringBuilder(pdataTableName).append("-Snapshot").toString();
                     admin.snapshot(snapshotName, TableName.valueOf(pdataTableName));
                 } finally {
@@ -452,17 +458,47 @@ public class IndexTool extends Configured implements Tool {
 
                 // set input for map reduce job using hbase snapshots
                 PhoenixMapReduceUtil
-                    .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery);
+                            .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery);
             }
             TableMapReduceUtil.initCredentials(job);
             
             if (useDirectApi) {
-                return configureSubmittableJobUsingDirectApi(job, false);
+                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
+                return configureSubmittableJobUsingDirectApi(job);
             } else {
                 return configureRunnableJobUsingBulkLoad(job, outputPath);
-                
             }
-            
+        }
+
+        private Job configureJobForServerBuildIndex()
+                throws Exception {
+
+            PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
+            PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
+
+            String physicalIndexTable = pIndexTable.getPhysicalName().getString();
+
+            PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
+            PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable);
+            if (tenantId != null) {
+                PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+            }
+
+            fs = outputPath.getFileSystem(configuration);
+            fs.delete(outputPath, true);
+
+            final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
+            final Job job = Job.getInstance(configuration, jobName);
+            job.setJarByClass(IndexTool.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+            FileOutputFormat.setOutputPath(job, outputPath);
+
+            PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class,
+                            qDataTable, "");
+
+            TableMapReduceUtil.initCredentials(job);
+            job.setMapperClass(PhoenixServerBuildIndexMapper.class);
+            return configureSubmittableJobUsingDirectApi(job);
         }
 
         /**
@@ -496,12 +532,9 @@ public class IndexTool extends Configured implements Tool {
          * @return
          * @throws Exception
          */
-        private Job configureSubmittableJobUsingDirectApi(Job job, boolean isPartialRebuild)
+        private Job configureSubmittableJobUsingDirectApi(Job job)
                 throws Exception {
-            if (!isPartialRebuild) {
-                //Don't configure mapper for partial build as it is configured already
-                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
-            }
+
             job.setReducerClass(PhoenixIndexImportDirectReducer.class);
             Configuration conf = job.getConfiguration();
             HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -520,6 +553,10 @@ public class IndexTool extends Configured implements Tool {
         
     }
 
+    public Job getJob() {
+        return job;
+    }
+
     @Override
     public int run(String[] args) throws Exception {
         Connection connection = null;
@@ -532,64 +569,75 @@ public class IndexTool extends Configured implements Tool {
                 printHelpAndExit(e.getMessage(), getOptions());
             }
             final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
-            final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
-            final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
-            final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
-            final boolean isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
-            final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
-            String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
-            boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
-            boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
             boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
-            byte[][] splitKeysBeforeJob = null;
-            boolean isLocalIndexBuild = false;
-            PTable pindexTable = null;
-            String tenantId = null;
+            tenantId = null;
             if (useTenantId) {
                 tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
                 configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
             }
             connection = ConnectionUtil.getInputConnection(configuration);
+            schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+            dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+            indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+            isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
+            qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+            pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
+            useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
+            String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+            boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+            useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
+
+            byte[][] splitKeysBeforeJob = null;
+            isLocalIndexBuild = false;
+            pIndexTable = null;
+
+            connection = ConnectionUtil.getInputConnection(configuration);
+
             if (indexTable != null) {
                 if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
                     throw new IllegalArgumentException(String.format(
                         " %s is not an index table for %s for this connection", indexTable, qDataTable));
                 }
-                pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
+                pIndexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
                         ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
+
+                if (schemaName != null && !schemaName.isEmpty()) {
+                    qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
+                } else {
+                    qIndexTable = indexTable;
+                }
                 htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices()
-                        .getTable(pindexTable.getPhysicalName().getBytes());
-                if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
+                        .getTable(pIndexTable.getPhysicalName().getBytes());
+
+                if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
                     isLocalIndexBuild = true;
                     splitKeysBeforeJob = htable.getRegionLocator().getStartKeys();
                 }
                 // presplit the index table
                 boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt());
-                boolean isSalted = pindexTable.getBucketNum() != null; // no need to split salted tables
-                if (!isSalted && IndexType.GLOBAL.equals(pindexTable.getIndexType())
+                boolean isSalted = pIndexTable.getBucketNum() != null; // no need to split salted tables
+                if (!isSalted && IndexType.GLOBAL.equals(pIndexTable.getIndexType())
                         && (autosplit || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()))) {
                     String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt());
                     int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
                     String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt());
                     double samplingRate = rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt);
                     LOG.info(String.format("Will split index %s , autosplit=%s , autoSplitNumRegions=%s , samplingRate=%s", indexTable, autosplit, autosplitNumRegions, samplingRate));
-                    splitIndexTable(connection.unwrap(PhoenixConnection.class), qDataTable, pindexTable, autosplit, autosplitNumRegions, samplingRate);
+
+                    splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit, autosplitNumRegions, samplingRate, configuration);
                 }
             }
-            
-            PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
 			Path outputPath = null;
 			FileSystem fs = null;
 			if (basePath != null) {
-				outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pindexTable == null
-						? pdataTable.getPhysicalName().getString() : pindexTable.getPhysicalName().getString());
+				outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pIndexTable == null
+						? pDataTable.getPhysicalName().getString() : pIndexTable.getPhysicalName().getString());
 				fs = outputPath.getFileSystem(configuration);
 				fs.delete(outputPath, true);
 			}
-            
-            Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable,
-                    useDirectApi, isPartialBuild, useSnapshot, tenantId);
+
+			job = new JobFactory(connection, configuration, outputPath).getJob();
+
             if (!isForeground && useDirectApi) {
                 LOG.info("Running Index Build in Background - Submit async and exit");
                 job.submit();
@@ -635,32 +683,29 @@ public class IndexTool extends Configured implements Tool {
         }
     }
 
-    
 
-    private void splitIndexTable(PhoenixConnection pConnection, String qDataTable,
-            PTable pindexTable, boolean autosplit, int autosplitNumRegions, double samplingRate)
+    private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit, int autosplitNumRegions, double samplingRate, Configuration configuration)
             throws SQLException, IOException, IllegalArgumentException, InterruptedException {
-        final PTable pdataTable = PhoenixRuntime.getTable(pConnection, qDataTable);
         int numRegions;
         try (HTable hDataTable =
                 (HTable) pConnection.getQueryServices()
-                        .getTable(pdataTable.getPhysicalName().getBytes())) {
+                        .getTable(pDataTable.getPhysicalName().getBytes())) {
             numRegions = hDataTable.getRegionLocator().getStartKeys().length;
             if (autosplit && !(numRegions > autosplitNumRegions)) {
                 LOG.info(String.format(
                     "Will not split index %s because the data table only has %s regions, autoSplitNumRegions=%s",
-                    pindexTable.getPhysicalName(), numRegions, autosplitNumRegions));
+                    pIndexTable.getPhysicalName(), numRegions, autosplitNumRegions));
                 return; // do nothing if # of regions is too low
             }
         }
         // build a tablesample query to fetch index column values from the data table
-        DataSourceColNames colNames = new DataSourceColNames(pdataTable, pindexTable);
+        DataSourceColNames colNames = new DataSourceColNames(pDataTable, pIndexTable);
         String qTableSample = String.format(qDataTable + " TABLESAMPLE(%.2f)", samplingRate);
         List<String> dataColNames = colNames.getDataColNames();
         final String dataSampleQuery =
                 QueryUtil.constructSelectStatement(qTableSample, dataColNames, null,
                     Hint.NO_INDEX, true);
-        IndexMaintainer maintainer = IndexMaintainer.create(pdataTable, pindexTable, pConnection);
+        IndexMaintainer maintainer = IndexMaintainer.create(pDataTable, pIndexTable, pConnection);
         ImmutableBytesWritable dataRowKeyPtr = new ImmutableBytesWritable();
         try (final PhoenixResultSet rs =
                 pConnection.createStatement().executeQuery(dataSampleQuery)
@@ -684,7 +729,7 @@ public class IndexTool extends Configured implements Tool {
                 splitPoints[splitIdx++] = b.getRightBoundExclusive();
             }
             // drop table and recreate with appropriate splits
-            TableName indexTN = TableName.valueOf(pindexTable.getPhysicalName().getBytes());
+            TableName indexTN = TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
             HTableDescriptor descriptor = admin.getTableDescriptor(indexTN);
             admin.disableTable(indexTN);
             admin.deleteTable(indexTN);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
new file mode 100644
index 0000000..34bcc9b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper that does not do much as regions servers actually build the index from the data table regions directly
+ */
+public class PhoenixServerBuildIndexMapper extends
+        Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, IntWritable> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PhoenixServerBuildIndexMapper.class);
+
+    @Override
+    protected void setup(final Context context) throws IOException, InterruptedException {
+        super.setup(context);
+    }
+
+    @Override
+    protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+            throws IOException, InterruptedException {
+        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+        // Make sure progress is reported to Application Master.
+        context.progress();
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), new IntWritable(0));
+        super.cleanup(context);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 8fa21fe..b41611f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -122,6 +122,10 @@ public final class PhoenixConfigurationUtil {
 
     public static final String SCRUTINY_INDEX_TABLE_NAME = "phoenix.mr.scrutiny.index.table.name";
 
+    public static final String INDEX_TOOL_DATA_TABLE_NAME = "phoenix.mr.index_tool.data.table.name";
+
+    public static final String INDEX_TOOL_INDEX_TABLE_NAME = "phoenix.mr.index_tool.index.table.name";
+
     public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table";
 
     public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size";
@@ -569,6 +573,16 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(configuration);
         return configuration.get(SCRUTINY_INDEX_TABLE_NAME);
     }
+    public static void setIndexToolDataTableName(Configuration configuration, String qDataTableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(qDataTableName);
+        configuration.set(INDEX_TOOL_DATA_TABLE_NAME, qDataTableName);
+    }
+
+    public static String getIndexToolDataTableName(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(INDEX_TOOL_DATA_TABLE_NAME);
+    }
 
     public static void setScrutinyIndexTable(Configuration configuration, String qIndexTableName) {
         Preconditions.checkNotNull(configuration);
@@ -581,6 +595,17 @@ public final class PhoenixConfigurationUtil {
         return SourceTable.valueOf(configuration.get(SCRUTINY_SOURCE_TABLE));
     }
 
+    public static void setIndexToolIndexTableName(Configuration configuration, String qIndexTableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(qIndexTableName);
+        configuration.set(INDEX_TOOL_INDEX_TABLE_NAME, qIndexTableName);
+    }
+
+    public static String getIndexToolIndexTableName(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME);
+    }
+
     public static void setScrutinySourceTable(Configuration configuration,
             SourceTable sourceTable) {
         Preconditions.checkNotNull(configuration);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index ecede80..f8625da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.mapreduce.util;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
@@ -70,6 +71,23 @@ public final class PhoenixMapReduceUtil {
     /**
      *
      * @param job
+     * @param inputClass        DBWritable class
+     * @param inputFormatClass  InputFormat class
+     * @param tableName         Input table name
+     * @param inputQuery        Select query
+     */
+
+    public static void setInput(final Job job, final Class<? extends DBWritable> inputClass,
+                                final Class<? extends InputFormat> inputFormatClass,
+                                final String tableName, final String inputQuery) {
+        final Configuration configuration = setInput(job, inputClass, inputFormatClass, tableName);
+        PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
+        PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+    }
+
+    /**
+     *
+     * @param job
      * @param inputClass DBWritable class
      * @param snapshotName The name of a snapshot (of a table) to read from
      * @param tableName Input table name
@@ -140,6 +158,15 @@ public final class PhoenixMapReduceUtil {
         return configuration;
     }
 
+    private static Configuration setInput(final Job job, final Class<? extends DBWritable> inputClass,
+                                          final Class<? extends InputFormat> inputFormatClass, final String tableName){
+        job.setInputFormatClass(inputFormatClass);
+        final Configuration configuration = job.getConfiguration();
+        PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+        PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+        return configuration;
+    }
+
     /**
      * A method to override which HBase cluster for {@link PhoenixInputFormat} to read from
      * @param job MapReduce Job
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 7cf2e21..6c5d939 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -156,6 +156,7 @@ import org.apache.phoenix.compile.PostDDLCompiler;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
 import org.apache.phoenix.compile.PostLocalIndexDDLCompiler;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.ServerBuildIndexCompiler;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementNormalizer;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -1390,16 +1391,17 @@ public class MetaDataClient {
     }
     
     private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException {
-        MutationPlan mutationPlan;
         if (index.getIndexType() == IndexType.LOCAL) {
             PostLocalIndexDDLCompiler compiler =
                     new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
-            mutationPlan = compiler.compile(index);
-        } else {
+            return compiler.compile(index);
+        } else if (dataTableRef.getTable().isTransactional()){
             PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
-            mutationPlan = compiler.compile(index);
+            return compiler.compile(index);
+        } else {
+            ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef));
+            return compiler.compile(index);
         }
-        return mutationPlan;
     }
 
     private MutationState buildIndex(PTable index, TableRef dataTableRef) throws SQLException {
@@ -1741,6 +1743,10 @@ public class MetaDataClient {
         if (connection.getSCN() != null) {
             return buildIndexAtTimeStamp(table, statement.getTable());
         }
+
+        String dataTableFullName = SchemaUtil.getTableName(
+                tableRef.getTable().getSchemaName().getString(),
+                tableRef.getTable().getTableName().getString());
         return buildIndex(table, tableRef);
     }