You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/11/18 19:37:25 UTC

[phoenix] branch 4.x updated: PHOENIX-6155 : Provide a coprocessor endpoint to avoid direct upserts into SYSTEM.TASK from the client

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new ed7f1a6  PHOENIX-6155 : Provide a coprocessor endpoint to avoid direct upserts into SYSTEM.TASK from the client
ed7f1a6 is described below

commit ed7f1a6b69d20c87dfbbee97ec8612ccfb866d1b
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed Nov 11 17:05:04 2020 +0530

    PHOENIX-6155 : Provide a coprocessor endpoint to avoid direct upserts into SYSTEM.TASK from the client
    
    Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
 .../phoenix/end2end/BackwardCompatibilityIT.java   |  70 +-
 .../end2end/BackwardCompatibilityTestUtil.java     |   3 +
 .../apache/phoenix/end2end/IndexRebuildTaskIT.java |  17 +-
 .../phoenix/end2end/SystemTablesUpgradeIT.java     |  10 +-
 .../phoenix/end2end/index/IndexMetadataIT.java     |  18 +-
 .../gold_files/gold_query_index_rebuild_async.txt  |  23 +
 .../it/resources/sql_files/index_rebuild_async.sql |  27 +
 .../sql_files/query_index_rebuild_async.sql        |  20 +
 .../coprocessor/BaseMetaDataEndpointObserver.java  |   6 +
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  22 +-
 .../coprocessor/MetaDataEndpointObserver.java      |   3 +
 .../phoenix/coprocessor/MetaDataProtocol.java      |   3 +-
 .../PhoenixMetaDataCoprocessorHost.java            |  14 +
 .../phoenix/coprocessor/TaskMetaDataEndpoint.java  | 127 ++++
 .../phoenix/coprocessor/TaskRegionObserver.java    |  34 +-
 .../coprocessor/generated/MetaDataProtos.java      |  62 +-
 .../coprocessor/generated/TaskMetaDataProtos.java  | 784 +++++++++++++++++++++
 .../coprocessor/tasks/IndexRebuildTask.java        |  18 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   2 +
 .../org/apache/phoenix/protobuf/ProtobufUtil.java  |   7 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  20 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |  35 +-
 .../phoenix/schema/task/SystemTaskParams.java      | 188 +++++
 .../java/org/apache/phoenix/schema/task/Task.java  | 152 +++-
 .../phoenix/util/TaskMetaDataServiceCallBack.java  |  67 ++
 .../coprocessor/TaskMetaDataEndpointTest.java      | 186 +++++
 phoenix-protocol/src/main/MetaDataService.proto    |   1 +
 .../src/main/TaskMetaDataService.proto             |  34 +
 28 files changed, 1876 insertions(+), 77 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
index 0061b83..79f7302 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
@@ -21,10 +21,12 @@ import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DATA;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DELETE;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ADD;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DIVERGED_VIEW;
+import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.INDEX_REBUILD_ASYNC;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DATA;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DELETE;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_ADD;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_DIVERGED_VIEW;
+import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_INDEX_REBUILD_ASYNC;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.assertExpectedOutput;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.checkForPreConditions;
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.computeClientVersions;
@@ -34,6 +36,9 @@ import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.UpgradePr
 import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.UpgradeProps.SET_MAX_LOOK_BACK_AGE;
 import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -44,6 +49,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.query.QueryServices;
@@ -305,19 +311,67 @@ public class BackwardCompatibilityIT {
     }
 
     @Test
-    public void testUpdatedSplitPolicyForSysTask() throws Exception {
-        executeQueryWithClientVersion(compatibleClientVersion, CREATE_DIVERGED_VIEW, zkQuorum);
-        executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, NONE);
+    public void testSplitPolicyAndCoprocessorForSysTask() throws Exception {
+        executeQueryWithClientVersion(compatibleClientVersion,
+            CREATE_DIVERGED_VIEW, zkQuorum);
 
-        try (org.apache.hadoop.hbase.client.Connection conn =
-                hbaseTestUtil.getConnection(); Admin admin = conn.getAdmin()) {
+        String[] versionArr = compatibleClientVersion.split("\\.");
+        int majorVersion = Integer.parseInt(versionArr[0]);
+        int minorVersion = Integer.parseInt(versionArr[1]);
+        org.apache.hadoop.hbase.client.Connection conn = null;
+        Admin admin = null;
+        // if connected with client < 4.15, SYSTEM.TASK does not exist
+        // if connected with client 4.15, SYSTEM.TASK exists without any
+        // split policy and also TaskMetaDataEndpoint coprocessor would not
+        // exist
+        if (majorVersion == 4 && minorVersion == 15) {
+            conn = hbaseTestUtil.getConnection();
+            admin = conn.getAdmin();
             HTableDescriptor tableDescriptor = admin.getTableDescriptor(
                 TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
-            assertEquals("split policy not updated with compatible client version: "
+            assertNull("split policy should be null with compatible client version: "
+                + compatibleClientVersion, tableDescriptor.getRegionSplitPolicyClassName());
+            assertFalse("Coprocessor " + TaskMetaDataEndpoint.class.getName()
+                + " should not have been added with compatible client version: "
                 + compatibleClientVersion,
-                tableDescriptor.getRegionSplitPolicyClassName(),
-                SystemTaskSplitPolicy.class.getName());
+                tableDescriptor.hasCoprocessor(TaskMetaDataEndpoint.class.getName()));
+        }
+
+        executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, NONE);
+
+        if (conn == null) {
+            conn = hbaseTestUtil.getConnection();
+            admin = conn.getAdmin();
         }
+        // connect with client > 4.15, and we have new split policy and new
+        // coprocessor loaded
+        HTableDescriptor tableDescriptor = admin.getTableDescriptor(
+            TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
+        assertEquals("split policy not updated with compatible client version: "
+            + compatibleClientVersion,
+            tableDescriptor.getRegionSplitPolicyClassName(),
+            SystemTaskSplitPolicy.class.getName());
+        assertTrue("Coprocessor " + TaskMetaDataEndpoint.class.getName()
+            + " has not been added with compatible client version: "
+            + compatibleClientVersion, tableDescriptor.hasCoprocessor(
+            TaskMetaDataEndpoint.class.getName()));
         assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW);
+        admin.close();
+        conn.close();
     }
+
+    @Test
+    public void testSystemTaskCreationWithIndexAsyncRebuild() throws Exception {
+        String[] versionArr = compatibleClientVersion.split("\\.");
+        int majorVersion = Integer.parseInt(versionArr[0]);
+        int minorVersion = Integer.parseInt(versionArr[1]);
+        // index async rebuild support min version check
+        if (majorVersion > 4 || (majorVersion == 4 && minorVersion >= 15)) {
+            executeQueryWithClientVersion(compatibleClientVersion,
+                INDEX_REBUILD_ASYNC, zkQuorum);
+            executeQueriesWithCurrentVersion(QUERY_INDEX_REBUILD_ASYNC, url, NONE);
+            assertExpectedOutput(QUERY_INDEX_REBUILD_ASYNC);
+        }
+    }
+
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
index c79e19b..e5a7d3d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
@@ -84,6 +84,9 @@ public final class BackwardCompatibilityTestUtil {
     public static final String QUERY_DELETE = QUERY_PREFIX + DELETE;
     public static final String QUERY_SELECT_AND_DROP_TABLE = QUERY_PREFIX + SELECT_AND_DROP_TABLE;
     public static final String QUERY_CREATE_DIVERGED_VIEW = QUERY_PREFIX + CREATE_DIVERGED_VIEW;
+    public static final String INDEX_REBUILD_ASYNC = "index_rebuild_async";
+    public static final String QUERY_INDEX_REBUILD_ASYNC = QUERY_PREFIX
+        + INDEX_REBUILD_ASYNC;
     public static final String MVN_HOME = "maven.home";
     public static final String JAVA_TMP_DIR = "java.io.tmpdir";
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
index 9d97cc5..71f5ae4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
 import org.apache.phoenix.schema.task.Task;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -147,9 +148,19 @@ public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
                             TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
 
             Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
-            Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD,
-                    TENANT1, null, viewName,
-                    PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true);
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                .setConn(conn.unwrap(PhoenixConnection.class))
+                .setTaskType(PTable.TaskType.INDEX_REBUILD)
+                .setTenantId(TENANT1)
+                .setSchemaName(null)
+                .setTableName(viewName)
+                .setTaskStatus(PTable.TaskStatus.CREATED.toString())
+                .setData(data)
+                .setPriority(null)
+                .setStartTs(startTs)
+                .setEndTs(null)
+                .setAccessCheckEnabled(true)
+                .build());
             task.run();
 
             // Check task status and other column values.
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
index e38c5e6..503ede4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
@@ -25,9 +25,11 @@ import java.sql.SQLException;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -127,12 +129,12 @@ public class SystemTablesUpgradeIT extends BaseTest {
         // SystemTaskSplitPolicy (which is extending DisabledRegionSplitPolicy
         // as of this writing)
         try (Admin admin = services.getAdmin()) {
-            String taskSplitPolicy = admin
-                .getTableDescriptor(TableName.valueOf(
-                    PhoenixDatabaseMetaData.SYSTEM_TASK_NAME))
-                .getRegionSplitPolicyClassName();
+            HTableDescriptor td = admin.getTableDescriptor(TableName.valueOf(
+                PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
+            String taskSplitPolicy = td.getRegionSplitPolicyClassName();
             assertEquals(SystemTaskSplitPolicy.class.getName(),
                 taskSplitPolicy);
+            assertTrue(td.hasCoprocessor(TaskMetaDataEndpoint.class.getName()));
         }
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index 6352d35..00d9c8c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_UPDATE_CACHE_FREQUENCY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -665,9 +666,20 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
             conn.createStatement().execute(
                     "ALTER INDEX " + indexName + " ON " + testTable + " REBUILD ALL ASYNC");
 
-            String
-                    queryTaskTable =
-                    "SELECT * FROM " +  PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
+            ResultSet resultSet = conn.createStatement().executeQuery(
+                "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+            assertTrue(resultSet.next());
+            assertEquals("2", resultSet.getString(1));
+            assertNull(resultSet.getString(3));
+            assertNull(resultSet.getString(4));
+            assertEquals(testTable, resultSet.getString(5));
+            assertEquals("CREATED", resultSet.getString(6));
+            assertEquals("4", resultSet.getString(8));
+            assertEquals(
+                "{\"IndexName\":\"" + indexName + "\",\"RebuildAll\":true}",
+                resultSet.getString(9));
+            String queryTaskTable =
+                "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
             ResultSet rs = conn.createStatement().executeQuery(queryTaskTable);
             assertTrue(rs.next());
             assertEquals(testTable, rs.getString(TABLE_NAME));
diff --git a/phoenix-core/src/it/resources/gold_files/gold_query_index_rebuild_async.txt b/phoenix-core/src/it/resources/gold_files/gold_query_index_rebuild_async.txt
new file mode 100644
index 0000000..ff020c5
--- /dev/null
+++ b/phoenix-core/src/it/resources/gold_files/gold_query_index_rebuild_async.txt
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'TASK_TYPE','TABLE_NAME','TASK_STATUS','TASK_PRIORITY'
+'2','TI','COMPLETED','4'
+'K','V'
+'key1','val2'
+'key3','val3'
diff --git a/phoenix-core/src/it/resources/sql_files/index_rebuild_async.sql b/phoenix-core/src/it/resources/sql_files/index_rebuild_async.sql
new file mode 100644
index 0000000..17133c0
--- /dev/null
+++ b/phoenix-core/src/it/resources/sql_files/index_rebuild_async.sql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS S.TI (K VARCHAR PRIMARY KEY, V VARCHAR);
+UPSERT INTO S.TI VALUES ('key1', 'val1');
+CREATE INDEX R_ASYNCIND_TI ON S.TI (K, V);
+ALTER INDEX R_ASYNCIND_TI ON S.TI DISABLE;
+UPSERT INTO S.TI VALUES ('key1', 'val2');
+ALTER INDEX R_ASYNCIND_TI ON S.TI REBUILD ALL ASYNC;
+UPSERT INTO S.TI VALUES ('key3', 'val3');
+UPSERT INTO S.TI VALUES ('key4', 'val4');
+DELETE FROM S.TI WHERE K = 'key4';
diff --git a/phoenix-core/src/it/resources/sql_files/query_index_rebuild_async.sql b/phoenix-core/src/it/resources/sql_files/query_index_rebuild_async.sql
new file mode 100644
index 0000000..46d073e
--- /dev/null
+++ b/phoenix-core/src/it/resources/sql_files/query_index_rebuild_async.sql
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SELECT TASK_TYPE, TABLE_NAME, TASK_STATUS, TASK_PRIORITY FROM SYSTEM.TASK;
+SELECT * FROM S.TI;
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
index 1f9e29b..b2ddf3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
@@ -113,4 +113,10 @@ public class BaseMetaDataEndpointObserver implements MetaDataEndpointObserver{
     public void preCreateViewAddChildLink(
             final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx,
             final String tableName) throws IOException {}
+
+    @Override
+    public void preUpsertTaskDetails(
+            final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx,
+            final String tableName) throws IOException {
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index fde1aea..fc3ddb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -210,6 +210,7 @@ import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.task.SystemTaskParams;
 import org.apache.phoenix.schema.task.Task;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PBoolean;
@@ -2274,13 +2275,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         try (PhoenixConnection conn =
                                 QueryUtil.getConnectionOnServer(env.getConfiguration())
                                     .unwrap(PhoenixConnection.class)) {
-                            Task.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS,
-                                Bytes.toString(tenantIdBytes),
-                                Bytes.toString(schemaName),
-                                Bytes.toString(tableOrViewName),
-                                PTable.TaskStatus.CREATED.toString(),
-                                null, null, null, null,
-                                this.accessCheckEnabled);
+                            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                                .setConn(conn)
+                                .setTaskType(PTable.TaskType.DROP_CHILD_VIEWS)
+                                .setTenantId(Bytes.toString(tenantIdBytes))
+                                .setSchemaName(Bytes.toString(schemaName))
+                                .setTableName(Bytes.toString(tableOrViewName))
+                                .setTaskStatus(
+                                    PTable.TaskStatus.CREATED.toString())
+                                .setData(null)
+                                .setPriority(null)
+                                .setStartTs(null)
+                                .setEndTs(null)
+                                .setAccessCheckEnabled(this.accessCheckEnabled)
+                                .build());
                         } catch (Throwable t) {
                             LOGGER.error("Adding a task to drop child views failed!", t);
                         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java
index 629f00b..d569c29 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java
@@ -68,4 +68,7 @@ public interface MetaDataEndpointObserver extends Coprocessor {
     void preCreateViewAddChildLink(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx,
             final String tableName) throws IOException;
 
+    void preUpsertTaskDetails(
+          ObserverContext<PhoenixMetaDataControllerEnvironment> ctx,
+          String tableName) throws IOException;
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 5d0fc85..e6f0bb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -178,8 +178,9 @@ public abstract class MetaDataProtocol extends MetaDataService {
         UNABLE_TO_CREATE_CHILD_LINK,
         UNABLE_TO_UPDATE_PARENT_TABLE,
         UNABLE_TO_DELETE_CHILD_LINK,
+        UNABLE_TO_UPSERT_TASK,
         NO_OP
-    };
+    }
 
   public static class SharedTableState {
         private PName tenantId;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
index f7ff05c..ed2c6c3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
@@ -264,4 +264,18 @@ public class PhoenixMetaDataCoprocessorHost
       }
       return user;
   }
+
+    void preUpsertTaskDetails(final String tableName) throws IOException {
+        execOperation(
+                new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>(
+                    getActiveUser()) {
+                    @Override
+                    public void call(MetaDataEndpointObserver observer,
+                            ObserverContext<PhoenixMetaDataControllerEnvironment> ctx)
+                            throws IOException {
+                        observer.preUpsertTaskDetails(ctx, tableName);
+                    }
+        });
+    }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpoint.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpoint.java
new file mode 100644
index 0000000..60b8f6b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpoint.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+  .TaskMetaDataService;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+  .TaskMutateRequest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl
+  .mutateRowsWithLocks;
+
+/**
+ * Phoenix metadata mutations for SYSTEM.TASK flows through this co-processor
+ * Endpoint.
+ */
+public class TaskMetaDataEndpoint extends TaskMetaDataService
+        implements CoprocessorService, Coprocessor {
+
+    private static final Logger LOGGER =
+        LoggerFactory.getLogger(TaskMetaDataEndpoint.class);
+
+    private RegionCoprocessorEnvironment env;
+    private PhoenixMetaDataCoprocessorHost phoenixAccessCoprocessorHost;
+    private boolean accessCheckEnabled;
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+        if (env instanceof RegionCoprocessorEnvironment) {
+            this.env = (RegionCoprocessorEnvironment) env;
+        } else {
+            throw new CoprocessorException("Must be loaded on a table region!");
+        }
+        this.phoenixAccessCoprocessorHost =
+            new PhoenixMetaDataCoprocessorHost(this.env);
+        this.accessCheckEnabled = env.getConfiguration().getBoolean(
+            QueryServices.PHOENIX_ACLS_ENABLED,
+            QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+        // no-op
+    }
+
+    @Override
+    public Service getService() {
+        return this;
+    }
+
+    @Override
+    public void upsertTaskDetails(RpcController controller,
+        TaskMutateRequest request, RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+        try {
+            List<Mutation> taskMutations = ProtobufUtil.getMutations(request);
+            if (taskMutations.isEmpty()) {
+                done.run(builder.build());
+                return;
+            }
+            byte[][] rowKeyMetaData = new byte[3][];
+            MetaDataUtil.getTenantIdAndSchemaAndTableName(taskMutations,
+                rowKeyMetaData);
+            byte[] schemaName =
+                rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+            byte[] tableName =
+                rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+            String fullTableName = SchemaUtil.getTableName(schemaName,
+                tableName);
+
+            phoenixAccessCoprocessorHost.preUpsertTaskDetails(fullTableName);
+
+            mutateRowsWithLocks(this.accessCheckEnabled, this.env.getRegion(),
+                taskMutations, Collections.<byte[]>emptySet(),
+                HConstants.NO_NONCE, HConstants.NO_NONCE);
+        } catch (Throwable t) {
+            LOGGER.error("Unable to write mutations to {}",
+                PhoenixDatabaseMetaData.SYSTEM_TASK_NAME, t);
+            builder.setReturnCode(
+                MetaDataProtos.MutationCode.UNABLE_TO_UPSERT_TASK);
+            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+            done.run(builder.build());
+        }
+    }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index e2cc782..3a99d6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -32,6 +32,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.phoenix.schema.task.SystemTaskParams;
 import org.apache.phoenix.util.JacksonUtil;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
@@ -195,10 +196,19 @@ public class TaskRegionObserver extends BaseRegionObserver {
                             }
 
                             // Change task status to STARTED
-                            Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
-                                    taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(),
-                                    taskRecord.getData(), taskRecord.getPriority(), taskRecord.getTimeStamp(), null,
-                                    true);
+                            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                                .setConn(connForTask)
+                                .setTaskType(taskRecord.getTaskType())
+                                .setTenantId(taskRecord.getTenantId())
+                                .setSchemaName(taskRecord.getSchemaName())
+                                .setTableName(taskRecord.getTableName())
+                                .setTaskStatus(PTable.TaskStatus.STARTED.toString())
+                                .setData(taskRecord.getData())
+                                .setPriority(taskRecord.getPriority())
+                                .setStartTs(taskRecord.getTimeStamp())
+                                .setEndTs(null)
+                                .setAccessCheckEnabled(true)
+                                .build());
 
                             // invokes the method at runtime
                             result = (TaskResult) runMethod.invoke(obj, taskRecord);
@@ -251,9 +261,19 @@ public class TaskRegionObserver extends BaseRegionObserver {
             data = jsonNode.toString();
 
             Timestamp endTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
-            Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
-                    taskRecord.getTableName(), taskStatus, data, taskRecord.getPriority(),
-                    taskRecord.getTimeStamp(), endTs, true);
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                .setConn(connForTask)
+                .setTaskType(taskRecord.getTaskType())
+                .setTenantId(taskRecord.getTenantId())
+                .setSchemaName(taskRecord.getSchemaName())
+                .setTableName(taskRecord.getTableName())
+                .setTaskStatus(taskStatus)
+                .setData(data)
+                .setPriority(taskRecord.getPriority())
+                .setStartTs(taskRecord.getTimeStamp())
+                .setEndTs(endTs)
+                .setAccessCheckEnabled(true)
+                .build());
         }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
index 28b58c6..38283a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
@@ -117,6 +117,10 @@ public final class MetaDataProtos {
      * <code>UNABLE_TO_DELETE_CHILD_LINK = 25;</code>
      */
     UNABLE_TO_DELETE_CHILD_LINK(25, 25),
+    /**
+     * <code>UNABLE_TO_UPSERT_TASK = 26;</code>
+     */
+    UNABLE_TO_UPSERT_TASK(26, 26),
     ;
 
     /**
@@ -223,6 +227,10 @@ public final class MetaDataProtos {
      * <code>UNABLE_TO_DELETE_CHILD_LINK = 25;</code>
      */
     public static final int UNABLE_TO_DELETE_CHILD_LINK_VALUE = 25;
+    /**
+     * <code>UNABLE_TO_UPSERT_TASK = 26;</code>
+     */
+    public static final int UNABLE_TO_UPSERT_TASK_VALUE = 26;
 
 
     public final int getNumber() { return value; }
@@ -255,6 +263,7 @@ public final class MetaDataProtos {
         case 23: return UNABLE_TO_CREATE_CHILD_LINK;
         case 24: return UNABLE_TO_UPDATE_PARENT_TABLE;
         case 25: return UNABLE_TO_DELETE_CHILD_LINK;
+        case 26: return UNABLE_TO_UPSERT_TASK;
         default: return null;
       }
     }
@@ -18048,7 +18057,7 @@ public final class MetaDataProtos {
       "cheRequest\022\020\n\010tenantId\030\001 \002(\014\022\022\n\nschemaNa" +
       "me\030\002 \002(\014\022\021\n\ttableName\030\003 \002(\014\022\027\n\017clientTim" +
       "estamp\030\004 \002(\003\022\025\n\rclientVersion\030\005 \001(\005\"\035\n\033C" +
-      "learTableFromCacheResponse*\332\005\n\014MutationC" +
+      "learTableFromCacheResponse*\365\005\n\014MutationC" +
       "ode\022\030\n\024TABLE_ALREADY_EXISTS\020\000\022\023\n\017TABLE_N" +
       "OT_FOUND\020\001\022\024\n\020COLUMN_NOT_FOUND\020\002\022\031\n\025COLU",
       "MN_ALREADY_EXISTS\020\003\022\035\n\031CONCURRENT_TABLE_" +
@@ -18066,31 +18075,32 @@ public final class MetaDataProtos {
       "#\n\037CANNOT_COERCE_AUTO_PARTITION_ID\020\025\022\024\n\020" +
       "TOO_MANY_INDEXES\020\026\022\037\n\033UNABLE_TO_CREATE_C" +
       "HILD_LINK\020\027\022!\n\035UNABLE_TO_UPDATE_PARENT_T" +
-      "ABLE\020\030\022\037\n\033UNABLE_TO_DELETE_CHILD_LINK\020\0312" +
-      "\345\006\n\017MetaDataService\022/\n\010getTable\022\020.GetTab" +
-      "leRequest\032\021.MetaDataResponse\0227\n\014getFunct" +
-      "ions\022\024.GetFunctionsRequest\032\021.MetaDataRes" +
-      "ponse\0221\n\tgetSchema\022\021.GetSchemaRequest\032\021.",
-      "MetaDataResponse\0225\n\013createTable\022\023.Create" +
-      "TableRequest\032\021.MetaDataResponse\022;\n\016creat" +
-      "eFunction\022\026.CreateFunctionRequest\032\021.Meta" +
-      "DataResponse\0227\n\014createSchema\022\024.CreateSch" +
-      "emaRequest\032\021.MetaDataResponse\0221\n\tdropTab" +
-      "le\022\021.DropTableRequest\032\021.MetaDataResponse" +
-      "\0223\n\ndropSchema\022\022.DropSchemaRequest\032\021.Met" +
-      "aDataResponse\0227\n\014dropFunction\022\024.DropFunc" +
-      "tionRequest\032\021.MetaDataResponse\0221\n\taddCol" +
-      "umn\022\021.AddColumnRequest\032\021.MetaDataRespons",
-      "e\0223\n\ndropColumn\022\022.DropColumnRequest\032\021.Me" +
-      "taDataResponse\022?\n\020updateIndexState\022\030.Upd" +
-      "ateIndexStateRequest\032\021.MetaDataResponse\022" +
-      "5\n\nclearCache\022\022.ClearCacheRequest\032\023.Clea" +
-      "rCacheResponse\0225\n\ngetVersion\022\022.GetVersio" +
-      "nRequest\032\023.GetVersionResponse\022P\n\023clearTa" +
-      "bleFromCache\022\033.ClearTableFromCacheReques" +
-      "t\032\034.ClearTableFromCacheResponseBB\n(org.a" +
-      "pache.phoenix.coprocessor.generatedB\016Met" +
-      "aDataProtosH\001\210\001\001\240\001\001"
+      "ABLE\020\030\022\037\n\033UNABLE_TO_DELETE_CHILD_LINK\020\031\022" +
+      "\031\n\025UNABLE_TO_UPSERT_TASK\020\0322\345\006\n\017MetaDataS" +
+      "ervice\022/\n\010getTable\022\020.GetTableRequest\032\021.M" +
+      "etaDataResponse\0227\n\014getFunctions\022\024.GetFun" +
+      "ctionsRequest\032\021.MetaDataResponse\0221\n\tgetS",
+      "chema\022\021.GetSchemaRequest\032\021.MetaDataRespo" +
+      "nse\0225\n\013createTable\022\023.CreateTableRequest\032" +
+      "\021.MetaDataResponse\022;\n\016createFunction\022\026.C" +
+      "reateFunctionRequest\032\021.MetaDataResponse\022" +
+      "7\n\014createSchema\022\024.CreateSchemaRequest\032\021." +
+      "MetaDataResponse\0221\n\tdropTable\022\021.DropTabl" +
+      "eRequest\032\021.MetaDataResponse\0223\n\ndropSchem" +
+      "a\022\022.DropSchemaRequest\032\021.MetaDataResponse" +
+      "\0227\n\014dropFunction\022\024.DropFunctionRequest\032\021" +
+      ".MetaDataResponse\0221\n\taddColumn\022\021.AddColu",
+      "mnRequest\032\021.MetaDataResponse\0223\n\ndropColu" +
+      "mn\022\022.DropColumnRequest\032\021.MetaDataRespons" +
+      "e\022?\n\020updateIndexState\022\030.UpdateIndexState" +
+      "Request\032\021.MetaDataResponse\0225\n\nclearCache" +
+      "\022\022.ClearCacheRequest\032\023.ClearCacheRespons" +
+      "e\0225\n\ngetVersion\022\022.GetVersionRequest\032\023.Ge" +
+      "tVersionResponse\022P\n\023clearTableFromCache\022" +
+      "\033.ClearTableFromCacheRequest\032\034.ClearTabl" +
+      "eFromCacheResponseBB\n(org.apache.phoenix" +
+      ".coprocessor.generatedB\016MetaDataProtosH\001",
+      "\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/TaskMetaDataProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/TaskMetaDataProtos.java
new file mode 100644
index 0000000..78c1f70
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/TaskMetaDataProtos.java
@@ -0,0 +1,784 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: TaskMetaDataService.proto
+
+package org.apache.phoenix.coprocessor.generated;
+
+public final class TaskMetaDataProtos {
+  private TaskMetaDataProtos() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface TaskMutateRequestOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // repeated bytes tableMetadataMutations = 1;
+    /**
+     * <code>repeated bytes tableMetadataMutations = 1;</code>
+     */
+    java.util.List<com.google.protobuf.ByteString> getTableMetadataMutationsList();
+    /**
+     * <code>repeated bytes tableMetadataMutations = 1;</code>
+     */
+    int getTableMetadataMutationsCount();
+    /**
+     * <code>repeated bytes tableMetadataMutations = 1;</code>
+     */
+    com.google.protobuf.ByteString getTableMetadataMutations(int index);
+  }
+  /**
+   * Protobuf type {@code TaskMutateRequest}
+   */
+  public static final class TaskMutateRequest extends
+      com.google.protobuf.GeneratedMessage
+      implements TaskMutateRequestOrBuilder {
+    // Use TaskMutateRequest.newBuilder() to construct.
+    private TaskMutateRequest(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private TaskMutateRequest(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final TaskMutateRequest defaultInstance;
+    public static TaskMutateRequest getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public TaskMutateRequest getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private TaskMutateRequest(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 10: {
+              if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+                tableMetadataMutations_ = new java.util.ArrayList<com.google.protobuf.ByteString>();
+                mutable_bitField0_ |= 0x00000001;
+              }
+              tableMetadataMutations_.add(input.readBytes());
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+          tableMetadataMutations_ = java.util.Collections.unmodifiableList(tableMetadataMutations_);
+        }
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.class, org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<TaskMutateRequest> PARSER =
+        new com.google.protobuf.AbstractParser<TaskMutateRequest>() {
+      public TaskMutateRequest parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new TaskMutateRequest(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<TaskMutateRequest> getParserForType() {
+      return PARSER;
+    }
+
+    // repeated bytes tableMetadataMutations = 1;
+    public static final int TABLEMETADATAMUTATIONS_FIELD_NUMBER = 1;
+    private java.util.List<com.google.protobuf.ByteString> tableMetadataMutations_;
+    /**
+     * <code>repeated bytes tableMetadataMutations = 1;</code>
+     */
+    public java.util.List<com.google.protobuf.ByteString>
+        getTableMetadataMutationsList() {
+      return tableMetadataMutations_;
+    }
+    /**
+     * <code>repeated bytes tableMetadataMutations = 1;</code>
+     */
+    public int getTableMetadataMutationsCount() {
+      return tableMetadataMutations_.size();
+    }
+    /**
+     * <code>repeated bytes tableMetadataMutations = 1;</code>
+     */
+    public com.google.protobuf.ByteString getTableMetadataMutations(int index) {
+      return tableMetadataMutations_.get(index);
+    }
+
+    private void initFields() {
+      tableMetadataMutations_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      for (int i = 0; i < tableMetadataMutations_.size(); i++) {
+        output.writeBytes(1, tableMetadataMutations_.get(i));
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      {
+        int dataSize = 0;
+        for (int i = 0; i < tableMetadataMutations_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(tableMetadataMutations_.get(i));
+        }
+        size += dataSize;
+        size += 1 * getTableMetadataMutationsList().size();
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)) {
+        return super.equals(obj);
+      }
+      org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest other = (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest) obj;
+
+      boolean result = true;
+      result = result && getTableMetadataMutationsList()
+          .equals(other.getTableMetadataMutationsList());
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (getTableMetadataMutationsCount() > 0) {
+        hash = (37 * hash) + TABLEMETADATAMUTATIONS_FIELD_NUMBER;
+        hash = (53 * hash) + getTableMetadataMutationsList().hashCode();
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code TaskMutateRequest}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequestOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.class, org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.Builder.class);
+      }
+
+      // Construct using org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        tableMetadataMutations_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_descriptor;
+      }
+
+      public org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest getDefaultInstanceForType() {
+        return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance();
+      }
+
+      public org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest build() {
+        org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest buildPartial() {
+        org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest result = new org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest(this);
+        int from_bitField0_ = bitField0_;
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          tableMetadataMutations_ = java.util.Collections.unmodifiableList(tableMetadataMutations_);
+          bitField0_ = (bitField0_ & ~0x00000001);
+        }
+        result.tableMetadataMutations_ = tableMetadataMutations_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest) {
+          return mergeFrom((org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest other) {
+        if (other == org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance()) return this;
+        if (!other.tableMetadataMutations_.isEmpty()) {
+          if (tableMetadataMutations_.isEmpty()) {
+            tableMetadataMutations_ = other.tableMetadataMutations_;
+            bitField0_ = (bitField0_ & ~0x00000001);
+          } else {
+            ensureTableMetadataMutationsIsMutable();
+            tableMetadataMutations_.addAll(other.tableMetadataMutations_);
+          }
+          onChanged();
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // repeated bytes tableMetadataMutations = 1;
+      private java.util.List<com.google.protobuf.ByteString> tableMetadataMutations_ = java.util.Collections.emptyList();
+      private void ensureTableMetadataMutationsIsMutable() {
+        if (!((bitField0_ & 0x00000001) == 0x00000001)) {
+          tableMetadataMutations_ = new java.util.ArrayList<com.google.protobuf.ByteString>(tableMetadataMutations_);
+          bitField0_ |= 0x00000001;
+         }
+      }
+      /**
+       * <code>repeated bytes tableMetadataMutations = 1;</code>
+       */
+      public java.util.List<com.google.protobuf.ByteString>
+          getTableMetadataMutationsList() {
+        return java.util.Collections.unmodifiableList(tableMetadataMutations_);
+      }
+      /**
+       * <code>repeated bytes tableMetadataMutations = 1;</code>
+       */
+      public int getTableMetadataMutationsCount() {
+        return tableMetadataMutations_.size();
+      }
+      /**
+       * <code>repeated bytes tableMetadataMutations = 1;</code>
+       */
+      public com.google.protobuf.ByteString getTableMetadataMutations(int index) {
+        return tableMetadataMutations_.get(index);
+      }
+      /**
+       * <code>repeated bytes tableMetadataMutations = 1;</code>
+       */
+      public Builder setTableMetadataMutations(
+          int index, com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureTableMetadataMutationsIsMutable();
+        tableMetadataMutations_.set(index, value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes tableMetadataMutations = 1;</code>
+       */
+      public Builder addTableMetadataMutations(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureTableMetadataMutationsIsMutable();
+        tableMetadataMutations_.add(value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes tableMetadataMutations = 1;</code>
+       */
+      public Builder addAllTableMetadataMutations(
+          java.lang.Iterable<? extends com.google.protobuf.ByteString> values) {
+        ensureTableMetadataMutationsIsMutable();
+        super.addAll(values, tableMetadataMutations_);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes tableMetadataMutations = 1;</code>
+       */
+      public Builder clearTableMetadataMutations() {
+        tableMetadataMutations_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:TaskMutateRequest)
+    }
+
+    static {
+      defaultInstance = new TaskMutateRequest(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:TaskMutateRequest)
+  }
+
+  /**
+   * Protobuf service {@code TaskMetaDataService}
+   */
+  public static abstract class TaskMetaDataService
+      implements com.google.protobuf.Service {
+    protected TaskMetaDataService() {}
+
+    public interface Interface {
+      /**
+       * <code>rpc upsertTaskDetails(.TaskMutateRequest) returns (.MetaDataResponse);</code>
+       */
+      public abstract void upsertTaskDetails(
+          com.google.protobuf.RpcController controller,
+          org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request,
+          com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done);
+
+    }
+
+    public static com.google.protobuf.Service newReflectiveService(
+        final Interface impl) {
+      return new TaskMetaDataService() {
+        @java.lang.Override
+        public  void upsertTaskDetails(
+            com.google.protobuf.RpcController controller,
+            org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request,
+            com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done) {
+          impl.upsertTaskDetails(controller, request, done);
+        }
+
+      };
+    }
+
+    public static com.google.protobuf.BlockingService
+        newReflectiveBlockingService(final BlockingInterface impl) {
+      return new com.google.protobuf.BlockingService() {
+        public final com.google.protobuf.Descriptors.ServiceDescriptor
+            getDescriptorForType() {
+          return getDescriptor();
+        }
+
+        public final com.google.protobuf.Message callBlockingMethod(
+            com.google.protobuf.Descriptors.MethodDescriptor method,
+            com.google.protobuf.RpcController controller,
+            com.google.protobuf.Message request)
+            throws com.google.protobuf.ServiceException {
+          if (method.getService() != getDescriptor()) {
+            throw new java.lang.IllegalArgumentException(
+              "Service.callBlockingMethod() given method descriptor for " +
+              "wrong service type.");
+          }
+          switch(method.getIndex()) {
+            case 0:
+              return impl.upsertTaskDetails(controller, (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)request);
+            default:
+              throw new java.lang.AssertionError("Can't get here.");
+          }
+        }
+
+        public final com.google.protobuf.Message
+            getRequestPrototype(
+            com.google.protobuf.Descriptors.MethodDescriptor method) {
+          if (method.getService() != getDescriptor()) {
+            throw new java.lang.IllegalArgumentException(
+              "Service.getRequestPrototype() given method " +
+              "descriptor for wrong service type.");
+          }
+          switch(method.getIndex()) {
+            case 0:
+              return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance();
+            default:
+              throw new java.lang.AssertionError("Can't get here.");
+          }
+        }
+
+        public final com.google.protobuf.Message
+            getResponsePrototype(
+            com.google.protobuf.Descriptors.MethodDescriptor method) {
+          if (method.getService() != getDescriptor()) {
+            throw new java.lang.IllegalArgumentException(
+              "Service.getResponsePrototype() given method " +
+              "descriptor for wrong service type.");
+          }
+          switch(method.getIndex()) {
+            case 0:
+              return org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance();
+            default:
+              throw new java.lang.AssertionError("Can't get here.");
+          }
+        }
+
+      };
+    }
+
+    /**
+     * <code>rpc upsertTaskDetails(.TaskMutateRequest) returns (.MetaDataResponse);</code>
+     */
+    public abstract void upsertTaskDetails(
+        com.google.protobuf.RpcController controller,
+        org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request,
+        com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done);
+
+    public static final
+        com.google.protobuf.Descriptors.ServiceDescriptor
+        getDescriptor() {
+      return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.getDescriptor().getServices().get(0);
+    }
+    public final com.google.protobuf.Descriptors.ServiceDescriptor
+        getDescriptorForType() {
+      return getDescriptor();
+    }
+
+    public final void callMethod(
+        com.google.protobuf.Descriptors.MethodDescriptor method,
+        com.google.protobuf.RpcController controller,
+        com.google.protobuf.Message request,
+        com.google.protobuf.RpcCallback<
+          com.google.protobuf.Message> done) {
+      if (method.getService() != getDescriptor()) {
+        throw new java.lang.IllegalArgumentException(
+          "Service.callMethod() given method descriptor for wrong " +
+          "service type.");
+      }
+      switch(method.getIndex()) {
+        case 0:
+          this.upsertTaskDetails(controller, (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)request,
+            com.google.protobuf.RpcUtil.<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse>specializeCallback(
+              done));
+          return;
+        default:
+          throw new java.lang.AssertionError("Can't get here.");
+      }
+    }
+
+    public final com.google.protobuf.Message
+        getRequestPrototype(
+        com.google.protobuf.Descriptors.MethodDescriptor method) {
+      if (method.getService() != getDescriptor()) {
+        throw new java.lang.IllegalArgumentException(
+          "Service.getRequestPrototype() given method " +
+          "descriptor for wrong service type.");
+      }
+      switch(method.getIndex()) {
+        case 0:
+          return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance();
+        default:
+          throw new java.lang.AssertionError("Can't get here.");
+      }
+    }
+
+    public final com.google.protobuf.Message
+        getResponsePrototype(
+        com.google.protobuf.Descriptors.MethodDescriptor method) {
+      if (method.getService() != getDescriptor()) {
+        throw new java.lang.IllegalArgumentException(
+          "Service.getResponsePrototype() given method " +
+          "descriptor for wrong service type.");
+      }
+      switch(method.getIndex()) {
+        case 0:
+          return org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance();
+        default:
+          throw new java.lang.AssertionError("Can't get here.");
+      }
+    }
+
+    public static Stub newStub(
+        com.google.protobuf.RpcChannel channel) {
+      return new Stub(channel);
+    }
+
+    public static final class Stub extends org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMetaDataService implements Interface {
+      private Stub(com.google.protobuf.RpcChannel channel) {
+        this.channel = channel;
+      }
+
+      private final com.google.protobuf.RpcChannel channel;
+
+      public com.google.protobuf.RpcChannel getChannel() {
+        return channel;
+      }
+
+      public  void upsertTaskDetails(
+          com.google.protobuf.RpcController controller,
+          org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request,
+          com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done) {
+        channel.callMethod(
+          getDescriptor().getMethods().get(0),
+          controller,
+          request,
+          org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance(),
+          com.google.protobuf.RpcUtil.generalizeCallback(
+            done,
+            org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.class,
+            org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance()));
+      }
+    }
+
+    public static BlockingInterface newBlockingStub(
+        com.google.protobuf.BlockingRpcChannel channel) {
+      return new BlockingStub(channel);
+    }
+
+    public interface BlockingInterface {
+      public org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse upsertTaskDetails(
+          com.google.protobuf.RpcController controller,
+          org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request)
+          throws com.google.protobuf.ServiceException;
+    }
+
+    private static final class BlockingStub implements BlockingInterface {
+      private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
+        this.channel = channel;
+      }
+
+      private final com.google.protobuf.BlockingRpcChannel channel;
+
+      public org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse upsertTaskDetails(
+          com.google.protobuf.RpcController controller,
+          org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request)
+          throws com.google.protobuf.ServiceException {
+        return (org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse) channel.callBlockingMethod(
+          getDescriptor().getMethods().get(0),
+          controller,
+          request,
+          org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance());
+      }
+
+    }
+
+    // @@protoc_insertion_point(class_scope:TaskMetaDataService)
+  }
+
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_TaskMutateRequest_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_TaskMutateRequest_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n\031TaskMetaDataService.proto\032\025MetaDataSer" +
+      "vice.proto\"3\n\021TaskMutateRequest\022\036\n\026table" +
+      "MetadataMutations\030\001 \003(\0142Q\n\023TaskMetaDataS" +
+      "ervice\022:\n\021upsertTaskDetails\022\022.TaskMutate" +
+      "Request\032\021.MetaDataResponseBF\n(org.apache" +
+      ".phoenix.coprocessor.generatedB\022TaskMeta" +
+      "DataProtosH\001\210\001\001\240\001\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          internal_static_TaskMutateRequest_descriptor =
+            getDescriptor().getMessageTypes().get(0);
+          internal_static_TaskMutateRequest_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_TaskMutateRequest_descriptor,
+              new java.lang.String[] { "TableMetadataMutations", });
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+          org.apache.phoenix.coprocessor.generated.MetaDataProtos.getDescriptor(),
+        }, assigner);
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
index 4e3b639..b4c99ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.phoenix.schema.task.SystemTaskParams;
 import org.apache.phoenix.util.JacksonUtil;
 
 import org.apache.hadoop.conf.Configuration;
@@ -106,10 +107,19 @@ public class IndexRebuildTask extends BaseTask  {
             Job job = indexToolRes.getValue();
 
             ((ObjectNode) jsonNode).put(JOB_ID, job.getJobID().toString());
-            Task.addTask(conn.unwrap(PhoenixConnection.class ), taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
-                    taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(),
-                    jsonNode.toString(), taskRecord.getPriority(),
-                    taskRecord.getTimeStamp(), null, true);
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                .setConn(conn.unwrap(PhoenixConnection.class))
+                .setTaskType(taskRecord.getTaskType())
+                .setTenantId(taskRecord.getTenantId())
+                .setSchemaName(taskRecord.getSchemaName())
+                .setTableName(taskRecord.getTableName())
+                .setTaskStatus(PTable.TaskStatus.STARTED.toString())
+                .setData(jsonNode.toString())
+                .setPriority(taskRecord.getPriority())
+                .setStartTs(taskRecord.getTimeStamp())
+                .setEndTs(null)
+                .setAccessCheckEnabled(true)
+                .build());
             // It will take some time to finish, so we will check the status in a separate task.
             return null;
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 9f4c139..788dea5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -433,6 +433,8 @@ public enum SQLExceptionCode {
             PTable.LinkType.CHILD_TABLE + ") for view"),
     TABLE_NOT_IN_REGION(1145, "XCL45", "No modifications allowed on this table. "
     + "Table not in this region."),
+    UNABLE_TO_UPSERT_TASK(1146, "XCL46",
+        "Error upserting records in SYSTEM.TASK table"),
     /**
      * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).
      */
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
index 45f43e1..66528dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
@@ -34,6 +34,8 @@ import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateVi
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+    .TaskMutateRequest;
 import org.apache.phoenix.schema.PTableType;
 
 import com.google.protobuf.ByteString;
@@ -108,6 +110,11 @@ public class ProtobufUtil {
         return getMutations(request.getTableMetadataMutationsList());
     }
 
+    public static List<Mutation> getMutations(TaskMutateRequest request)
+            throws IOException {
+        return getMutations(request.getTableMetadataMutationsList());
+    }
+
 	public static List<Mutation> getMutations(CreateViewAddChildLinkRequest request)
 	throws IOException {
 		return getMutations(request.getTableMetadataMutationsList());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index d76c647..7d93abd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -168,10 +168,10 @@ import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.coprocessor.SequenceRegionObserver;
 import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
 import org.apache.phoenix.coprocessor.PhoenixTTLRegionObserver;
+import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.ChildLinkMetaDataService;
-import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateViewAddChildLinkRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
@@ -1095,6 +1095,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 if(!descriptor.hasCoprocessor(TaskRegionObserver.class.getName())) {
                     descriptor.addCoprocessor(TaskRegionObserver.class.getName(), null, priority, null);
                 }
+                if (!descriptor.hasCoprocessor(
+                        TaskMetaDataEndpoint.class.getName())) {
+                    descriptor.addCoprocessor(TaskMetaDataEndpoint.class.getName(),
+                        null, priority, null);
+                }
             } else if (SchemaUtil.isChildLinkTable(tableName)) {
                 if (!descriptor.hasCoprocessor(ChildLinkMetaDataEndpoint.class.getName())) {
                     descriptor.addCoprocessor(ChildLinkMetaDataEndpoint.class.getName(), null, priority, null);
@@ -4146,7 +4151,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 TableName tableName = SchemaUtil.getPhysicalTableName(
                     PhoenixDatabaseMetaData.SYSTEM_TASK_NAME, props);
                 td = admin.getTableDescriptor(tableName);
+                boolean isTableDescUpdated = false;
                 if (updateAndConfirmSplitPolicyForTask(td)) {
+                    isTableDescUpdated = true;
+                }
+                if (!td.hasCoprocessor(TaskMetaDataEndpoint.class.getName())) {
+                    int priority = props.getInt(
+                        QueryServices.COPROCESSOR_PRIORITY_ATTRIB,
+                        QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
+                    td.addCoprocessor(
+                        TaskMetaDataEndpoint.class.getName(), null, priority,
+                        null);
+                    isTableDescUpdated = true;
+                }
+                if (isTableDescUpdated) {
                     admin.modifyTable(tableName, td);
                     pollForUpdatedTableDescriptor(admin,
                         td, tableName.getName());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 4e20997..68b0891 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -79,6 +79,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYNC_INDEX_CREATED
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
@@ -100,6 +101,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_PHOENIX_TTL_HW
 import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
@@ -170,6 +172,8 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.SharedTableState;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.util.TaskMetaDataServiceCallBack;
 import org.apache.phoenix.util.ViewUtil;
 import org.apache.phoenix.util.JacksonUtil;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -4690,11 +4694,32 @@ public class MetaDataClient {
                                 }};
                                 try {
                                     String json = JacksonUtil.getObjectWriter().writeValueAsString(props);
-                                    Task.addTask(connection, PTable.TaskType.INDEX_REBUILD,
-                                            tenantId, schemaName,
-                                            dataTableName, PTable.TaskStatus.CREATED.toString(),
-                                            json, null, ts, null, true);
-                                    connection.commit();
+                                    List<Mutation> sysTaskUpsertMutations = Task.getMutationsForAddTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                                        .setConn(connection)
+                                        .setTaskType(
+                                            PTable.TaskType.INDEX_REBUILD)
+                                        .setTenantId(tenantId)
+                                        .setSchemaName(schemaName)
+                                        .setTableName(dataTableName)
+                                        .setTaskStatus(
+                                            PTable.TaskStatus.CREATED.toString())
+                                        .setData(json)
+                                        .setPriority(null)
+                                        .setStartTs(ts)
+                                        .setEndTs(null)
+                                        .setAccessCheckEnabled(true)
+                                        .build());
+                                    byte[] rowKey = sysTaskUpsertMutations
+                                        .get(0).getRow();
+                                    MetaDataMutationResult metaDataMutationResult =
+                                        Task.taskMetaDataCoprocessorExec(connection, rowKey,
+                                            new TaskMetaDataServiceCallBack(sysTaskUpsertMutations));
+                                    if (MutationCode.UNABLE_TO_UPSERT_TASK.equals(
+                                            metaDataMutationResult.getMutationCode())) {
+                                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK)
+                                          .setSchemaName(SYSTEM_SCHEMA_NAME)
+                                          .setTableName(SYSTEM_TASK_TABLE).build().buildException();
+                                    }
                                 } catch (IOException e) {
                                     throw new SQLException("Exception happened while adding a System.Task" + e.toString());
                                 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/SystemTaskParams.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/SystemTaskParams.java
new file mode 100644
index 0000000..1740256
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/SystemTaskParams.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.task;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+
+import java.sql.Timestamp;
+
+/**
+ * Task params to be used while upserting records in SYSTEM.TASK table.
+ * This POJO is mainly used while upserting(and committing) or generating
+ * upsert mutations plan in {@link Task} class
+ */
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+    value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
+    justification = "endTs and startTs are not used for mutation")
+public class SystemTaskParams {
+
+    private final PhoenixConnection conn;
+    private final PTable.TaskType taskType;
+    private final String tenantId;
+    private final String schemaName;
+    private final String tableName;
+    private final String taskStatus;
+    private final String data;
+    private final Integer priority;
+    private final Timestamp startTs;
+    private final Timestamp endTs;
+    private final boolean accessCheckEnabled;
+
+    public SystemTaskParams(PhoenixConnection conn, PTable.TaskType taskType,
+            String tenantId, String schemaName, String tableName,
+            String taskStatus, String data, Integer priority, Timestamp startTs,
+            Timestamp endTs, boolean accessCheckEnabled) {
+        this.conn = conn;
+        this.taskType = taskType;
+        this.tenantId = tenantId;
+        this.schemaName = schemaName;
+        this.tableName = tableName;
+        this.taskStatus = taskStatus;
+        this.data = data;
+        this.priority = priority;
+        this.startTs = startTs;
+        this.endTs = endTs;
+        this.accessCheckEnabled = accessCheckEnabled;
+    }
+
+    public PhoenixConnection getConn() {
+        return conn;
+    }
+
+    public PTable.TaskType getTaskType() {
+        return taskType;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+
+    public String getSchemaName() {
+        return schemaName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getTaskStatus() {
+        return taskStatus;
+    }
+
+    public String getData() {
+        return data;
+    }
+
+    public Integer getPriority() {
+        return priority;
+    }
+
+    public Timestamp getStartTs() {
+        return startTs;
+    }
+
+    public Timestamp getEndTs() {
+        return endTs;
+    }
+
+    public boolean isAccessCheckEnabled() {
+        return accessCheckEnabled;
+    }
+
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+        value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
+        justification = "endTs and startTs are not used for mutation")
+    public static class SystemTaskParamsBuilder {
+
+        private PhoenixConnection conn;
+        private PTable.TaskType taskType;
+        private String tenantId;
+        private String schemaName;
+        private String tableName;
+        private String taskStatus;
+        private String data;
+        private Integer priority;
+        private Timestamp startTs;
+        private Timestamp endTs;
+        private boolean accessCheckEnabled;
+
+        public SystemTaskParamsBuilder setConn(PhoenixConnection conn) {
+            this.conn = conn;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setTaskType(PTable.TaskType taskType) {
+            this.taskType = taskType;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setTenantId(String tenantId) {
+            this.tenantId = tenantId;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setSchemaName(String schemaName) {
+            this.schemaName = schemaName;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setTableName(String tableName) {
+            this.tableName = tableName;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setTaskStatus(String taskStatus) {
+            this.taskStatus = taskStatus;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setData(String data) {
+            this.data = data;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setPriority(Integer priority) {
+            this.priority = priority;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setStartTs(Timestamp startTs) {
+            this.startTs = startTs;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setEndTs(Timestamp endTs) {
+            this.endTs = endTs;
+            return this;
+        }
+
+        public SystemTaskParamsBuilder setAccessCheckEnabled(
+                boolean accessCheckEnabled) {
+            this.accessCheckEnabled = accessCheckEnabled;
+            return this;
+        }
+
+        public SystemTaskParams build() {
+            return new SystemTaskParams(conn, taskType, tenantId, schemaName,
+                tableName, taskStatus, data, priority, startTs, endTs,
+                accessCheckEnabled);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
index 0fe256b..e146ce0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -18,14 +18,27 @@
 package org.apache.phoenix.schema.task;
 
 import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.RpcUtil;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+    .TaskMetaDataService;
 import org.apache.phoenix.coprocessor.tasks.DropChildViewsTask;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +51,9 @@ import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 public class Task {
     public static final Logger LOGGER = LoggerFactory.getLogger(Task.class);
@@ -75,6 +90,46 @@ public class Task {
         }
     }
 
+    private static List<Mutation> getMutationsForSystemTaskTable(
+            final PhoenixConnection conn, final PreparedStatement stmt,
+            final boolean accessCheckEnabled) throws IOException {
+        // we need to mutate SYSTEM.TASK with HBase/login user if access is enabled.
+        if (accessCheckEnabled) {
+            return User.runAsLoginUser(new PrivilegedExceptionAction<List<Mutation>>() {
+                @Override
+                public List<Mutation> run() throws Exception {
+                    final RpcServer.Call rpcContext = RpcUtil.getRpcContext();
+                    // setting RPC context as null so that user can be reset
+                    try {
+                        RpcUtil.setRpcContext(null);
+                        return executeStatementAndGetTaskMutations(conn, stmt);
+                    } catch (SQLException e) {
+                        throw new IOException(e);
+                    } finally {
+                        // setting RPC context back to original context of the RPC
+                        RpcUtil.setRpcContext(rpcContext);
+                    }
+                }
+            });
+        } else {
+            try {
+                return executeStatementAndGetTaskMutations(conn, stmt);
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    private static List<Mutation> executeStatementAndGetTaskMutations(
+            PhoenixConnection conn, PreparedStatement stmt)
+            throws SQLException {
+        stmt.execute();
+        // retrieve mutations for SYSTEM.TASK upsert query
+        Iterator<Pair<byte[], List<Mutation>>> iterator =
+            conn.getMutationState().toMutations();
+        return iterator.next().getSecond();
+    }
+
     private static  PreparedStatement setValuesToAddTaskPS(PreparedStatement stmt, PTable.TaskType taskType,
             String tenantId, String schemaName, String tableName, String taskStatus, String data,
             Integer priority, Timestamp startTs, Timestamp endTs) throws SQLException {
@@ -123,13 +178,28 @@ public class Task {
         return stmt;
     }
 
-    public static void addTask(PhoenixConnection conn, PTable.TaskType taskType, String tenantId, String schemaName,
-            String tableName, String taskStatus, String data, Integer priority, Timestamp startTs, Timestamp endTs,
-            boolean accessCheckEnabled)
-            throws IOException {
+    /**
+     * Execute and commit upsert query on SYSTEM.TASK
+     * This method should be used only from server side. Client should use
+     * {@link #getMutationsForAddTask(SystemTaskParams)} instead of direct
+     * upsert commit.
+     *
+     * @param systemTaskParams Task params with various task related arguments
+     * @throws IOException If something goes wrong while preparing mutations
+     *     or committing transactions
+     */
+    public static void addTask(SystemTaskParams systemTaskParams)
+      throws IOException {
+        addTaskAndGetStatement(systemTaskParams, systemTaskParams.getConn(),
+            true);
+    }
+
+    private static PreparedStatement addTaskAndGetStatement(
+            SystemTaskParams systemTaskParams, PhoenixConnection connection,
+            boolean shouldCommit) throws IOException {
         PreparedStatement stmt;
         try {
-            stmt = conn.prepareStatement("UPSERT INTO " +
+            stmt = connection.prepareStatement("UPSERT INTO " +
                     PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
                     PhoenixDatabaseMetaData.TASK_TYPE + ", " +
                     PhoenixDatabaseMetaData.TENANT_ID + ", " +
@@ -141,12 +211,78 @@ public class Task {
                     PhoenixDatabaseMetaData.TASK_END_TS + ", " +
                     PhoenixDatabaseMetaData.TASK_DATA +
                     " ) VALUES(?,?,?,?,?,?,?,?,?)");
-            stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName, taskStatus, data, priority, startTs, endTs);
-            LOGGER.info("Adding task " + taskType + "," +tableName + "," + taskStatus + "," + startTs, ","+endTs);
+            stmt = setValuesToAddTaskPS(stmt, systemTaskParams.getTaskType(),
+                systemTaskParams.getTenantId(),
+                systemTaskParams.getSchemaName(),
+                systemTaskParams.getTableName(),
+                systemTaskParams.getTaskStatus(), systemTaskParams.getData(),
+                systemTaskParams.getPriority(), systemTaskParams.getStartTs(),
+                systemTaskParams.getEndTs());
+            LOGGER.info("Adding task type: "
+                + systemTaskParams.getTaskType() + " , tableName: "
+                + systemTaskParams.getTableName() + " , taskStatus: "
+                + systemTaskParams.getTaskStatus() + " , startTs: "
+                + systemTaskParams.getStartTs() + " , endTs: "
+                + systemTaskParams.getEndTs());
         } catch (SQLException e) {
             throw new IOException(e);
         }
-        mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+        // if query is getting executed by client, do not execute and commit
+        // mutations
+        if (shouldCommit) {
+            mutateSystemTaskTable(connection, stmt,
+                systemTaskParams.isAccessCheckEnabled());
+        }
+        return stmt;
+    }
+
+    public static List<Mutation> getMutationsForAddTask(
+            SystemTaskParams systemTaskParams)
+            throws IOException, SQLException {
+        PhoenixConnection curConn = systemTaskParams.getConn();
+        Configuration conf = curConn.getQueryServices().getConfiguration();
+        // create new connection as we do not want to mix up mutationState
+        // with existing connection
+        try (PhoenixConnection newConnection =
+                QueryUtil.getConnectionOnServer(curConn.getClientInfo(), conf)
+                    .unwrap(PhoenixConnection.class)) {
+            PreparedStatement statement = addTaskAndGetStatement(
+                systemTaskParams, newConnection, false);
+            return getMutationsForSystemTaskTable(newConnection,
+                statement, systemTaskParams.isAccessCheckEnabled());
+        }
+    }
+
+    /**
+     * Invoke SYSTEM.TASK metadata coprocessor endpoint
+     *
+     * @param connection Phoenix Connection
+     * @param rowKey key corresponding to SYSTEM.TASK mutation
+     * @param callable used to invoke the coprocessor endpoint to upsert
+     *     records in SYSTEM.TASK
+     * @return result of invoking the coprocessor endpoint
+     * @throws SQLException If something goes wrong while executing co
+     */
+    public static MetaDataMutationResult taskMetaDataCoprocessorExec(
+            final PhoenixConnection connection, final byte[] rowKey,
+            final Batch.Call<TaskMetaDataService, MetaDataResponse> callable)
+            throws SQLException {
+        TableName tableName = SchemaUtil.getPhysicalName(
+              PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES,
+              connection.getQueryServices().getProps());
+        try (Table table =
+                connection.getQueryServices().getTable(tableName.getName())) {
+            final Map<byte[], MetaDataResponse> results =
+                table.coprocessorService(TaskMetaDataService.class, rowKey,
+                    rowKey, callable);
+            assert results.size() == 1;
+            MetaDataResponse result = results.values().iterator().next();
+            return MetaDataMutationResult.constructFromProto(result);
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        } catch (Throwable t) {
+            throw new SQLException(t);
+        }
     }
 
     public static void deleteTask(PhoenixConnection conn, PTable.TaskType taskType, Timestamp ts, String tenantId,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TaskMetaDataServiceCallBack.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TaskMetaDataServiceCallBack.java
new file mode 100644
index 0000000..b7630a8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TaskMetaDataServiceCallBack.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+    .TaskMetaDataService;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Callable implementation for coprocessor endpoint associated with
+ * SYSTEM.TASK
+ */
+public class TaskMetaDataServiceCallBack
+    implements Batch.Call<TaskMetaDataService, MetaDataResponse> {
+
+    private final List<Mutation> taskMutations;
+
+    public TaskMetaDataServiceCallBack(List<Mutation> taskMutations) {
+        this.taskMutations = taskMutations;
+    }
+
+    @Override
+    public MetaDataResponse call(TaskMetaDataService instance)
+            throws IOException {
+        ServerRpcController controller = new ServerRpcController();
+        BlockingRpcCallback<MetaDataResponse> rpcCallback =
+            new BlockingRpcCallback<>();
+        TaskMetaDataProtos.TaskMutateRequest.Builder builder =
+            TaskMetaDataProtos.TaskMutateRequest.newBuilder();
+        for (Mutation mutation : taskMutations) {
+            ClientProtos.MutationProto mp = ProtobufUtil.toProto(mutation);
+            builder.addTableMetadataMutations(mp.toByteString());
+        }
+        TaskMetaDataProtos.TaskMutateRequest build = builder.build();
+        instance.upsertTaskDetails(controller, build, rpcCallback);
+        if (controller.getFailedOn() != null) {
+            throw controller.getFailedOn();
+        }
+        return rpcCallback.get();
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpointTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpointTest.java
new file mode 100644
index 0000000..ae19bc6
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpointTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessor;
+
+
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for TaskMetaDataEndpoint
+ */
+public class TaskMetaDataEndpointTest {
+
+    private TaskMetaDataEndpoint taskMetaDataEndpoint;
+    private Configuration configuration;
+
+    @Mock
+    private Region region;
+
+    @Mock
+    private HRegionInfo regionInfo;
+
+    @Mock
+    private RpcController controller;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        configuration = new Configuration();
+        RegionCoprocessorEnvironment environment =
+                new RegionCoprocessorEnvironment() {
+
+            @Override
+            public int getVersion() {
+                return 0;
+            }
+
+            @Override
+            public String getHBaseVersion() {
+                return null;
+            }
+
+            @Override
+            public Coprocessor getInstance() {
+                return null;
+            }
+
+            @Override
+            public int getPriority() {
+                return 0;
+            }
+
+            @Override
+            public int getLoadSequence() {
+                return 0;
+            }
+
+            @Override
+            public Configuration getConfiguration() {
+                return configuration;
+            }
+
+            @Override
+            public HTableInterface getTable(
+              TableName tableName) throws IOException {
+                return null;
+            }
+
+            @Override
+            public HTableInterface getTable(TableName tableName,
+              ExecutorService service) throws IOException {
+                return null;
+            }
+
+            @Override
+            public ClassLoader getClassLoader() {
+                return null;
+            }
+
+            @Override
+            public Region getRegion() {
+                return region;
+            }
+
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return regionInfo;
+            }
+
+            @Override
+            public RegionServerServices getRegionServerServices() {
+                return null;
+            }
+
+            @Override
+            public ConcurrentMap<String, Object> getSharedData() {
+                return null;
+            }
+        };
+        taskMetaDataEndpoint = new TaskMetaDataEndpoint();
+        taskMetaDataEndpoint.start(environment);
+    }
+
+    @Test
+    public void testUpsertTaskDetails() throws Exception {
+        Mutation mutation = new Put(Bytes.toBytes("row1"));
+        TaskMetaDataProtos.TaskMutateRequest.Builder builder =
+            TaskMetaDataProtos.TaskMutateRequest.newBuilder();
+        ClientProtos.MutationProto mp = ProtobufUtil.toProto(mutation);
+        builder.addTableMetadataMutations(mp.toByteString());
+        TaskMetaDataProtos.TaskMutateRequest request = builder.build();
+        BlockingRpcCallback<MetaDataProtos.MetaDataResponse> rpcCallback =
+            new BlockingRpcCallback<>();
+        Mockito.doNothing().when(region).mutateRowsWithLocks(
+            Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(),
+            Mockito.anyLong(), Mockito.anyLong());
+        taskMetaDataEndpoint.upsertTaskDetails(controller, request, rpcCallback);
+        Mockito.verify(region, Mockito.times(1)).mutateRowsWithLocks(
+            Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(),
+            Mockito.anyLong(), Mockito.anyLong());
+    }
+
+    @Test
+    public void testUpsertTaskDetailsFailure() throws Exception {
+        Mutation mutation = new Put(Bytes.toBytes("row2"));
+        TaskMetaDataProtos.TaskMutateRequest.Builder builder =
+          TaskMetaDataProtos.TaskMutateRequest.newBuilder();
+        ClientProtos.MutationProto mp = ProtobufUtil.toProto(mutation);
+        builder.addTableMetadataMutations(mp.toByteString());
+        TaskMetaDataProtos.TaskMutateRequest request = builder.build();
+        BlockingRpcCallback<MetaDataProtos.MetaDataResponse> rpcCallback =
+            new BlockingRpcCallback<>();
+        Mockito.doThrow(new IOException()).when(region).mutateRowsWithLocks(
+            Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(),
+            Mockito.anyLong(), Mockito.anyLong());
+        taskMetaDataEndpoint.upsertTaskDetails(controller, request, rpcCallback);
+        Mockito.verify(region, Mockito.times(1)).mutateRowsWithLocks(
+            Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(),
+            Mockito.anyLong(), Mockito.anyLong());
+        assertEquals(MetaDataProtos.MutationCode.UNABLE_TO_UPSERT_TASK,
+            rpcCallback.get().getReturnCode());
+    }
+
+}
\ No newline at end of file
diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto
index a30fad7..cbc53d6 100644
--- a/phoenix-protocol/src/main/MetaDataService.proto
+++ b/phoenix-protocol/src/main/MetaDataService.proto
@@ -53,6 +53,7 @@ enum MutationCode {
   UNABLE_TO_CREATE_CHILD_LINK = 23;
   UNABLE_TO_UPDATE_PARENT_TABLE = 24;
   UNABLE_TO_DELETE_CHILD_LINK = 25;
+  UNABLE_TO_UPSERT_TASK = 26;
 };
 
 message SharedTableState {
diff --git a/phoenix-protocol/src/main/TaskMetaDataService.proto b/phoenix-protocol/src/main/TaskMetaDataService.proto
new file mode 100644
index 0000000..d6868bf
--- /dev/null
+++ b/phoenix-protocol/src/main/TaskMetaDataService.proto
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.phoenix.coprocessor.generated";
+option java_outer_classname = "TaskMetaDataProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "MetaDataService.proto";
+
+message TaskMutateRequest {
+  repeated bytes tableMetadataMutations = 1;
+}
+
+service TaskMetaDataService {
+  rpc upsertTaskDetails(TaskMutateRequest)
+      returns (MetaDataResponse);
+}