You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/11/18 19:37:25 UTC
[phoenix] branch 4.x updated: PHOENIX-6155 : Provide a coprocessor
endpoint to avoid direct upserts into SYSTEM.TASK from the client
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new ed7f1a6 PHOENIX-6155 : Provide a coprocessor endpoint to avoid direct upserts into SYSTEM.TASK from the client
ed7f1a6 is described below
commit ed7f1a6b69d20c87dfbbee97ec8612ccfb866d1b
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed Nov 11 17:05:04 2020 +0530
PHOENIX-6155 : Provide a coprocessor endpoint to avoid direct upserts into SYSTEM.TASK from the client
Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
.../phoenix/end2end/BackwardCompatibilityIT.java | 70 +-
.../end2end/BackwardCompatibilityTestUtil.java | 3 +
.../apache/phoenix/end2end/IndexRebuildTaskIT.java | 17 +-
.../phoenix/end2end/SystemTablesUpgradeIT.java | 10 +-
.../phoenix/end2end/index/IndexMetadataIT.java | 18 +-
.../gold_files/gold_query_index_rebuild_async.txt | 23 +
.../it/resources/sql_files/index_rebuild_async.sql | 27 +
.../sql_files/query_index_rebuild_async.sql | 20 +
.../coprocessor/BaseMetaDataEndpointObserver.java | 6 +
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 22 +-
.../coprocessor/MetaDataEndpointObserver.java | 3 +
.../phoenix/coprocessor/MetaDataProtocol.java | 3 +-
.../PhoenixMetaDataCoprocessorHost.java | 14 +
.../phoenix/coprocessor/TaskMetaDataEndpoint.java | 127 ++++
.../phoenix/coprocessor/TaskRegionObserver.java | 34 +-
.../coprocessor/generated/MetaDataProtos.java | 62 +-
.../coprocessor/generated/TaskMetaDataProtos.java | 784 +++++++++++++++++++++
.../coprocessor/tasks/IndexRebuildTask.java | 18 +-
.../apache/phoenix/exception/SQLExceptionCode.java | 2 +
.../org/apache/phoenix/protobuf/ProtobufUtil.java | 7 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 20 +-
.../org/apache/phoenix/schema/MetaDataClient.java | 35 +-
.../phoenix/schema/task/SystemTaskParams.java | 188 +++++
.../java/org/apache/phoenix/schema/task/Task.java | 152 +++-
.../phoenix/util/TaskMetaDataServiceCallBack.java | 67 ++
.../coprocessor/TaskMetaDataEndpointTest.java | 186 +++++
phoenix-protocol/src/main/MetaDataService.proto | 1 +
.../src/main/TaskMetaDataService.proto | 34 +
28 files changed, 1876 insertions(+), 77 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
index 0061b83..79f7302 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
@@ -21,10 +21,12 @@ import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DATA;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DELETE;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ADD;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DIVERGED_VIEW;
+import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.INDEX_REBUILD_ASYNC;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DATA;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DELETE;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_ADD;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_DIVERGED_VIEW;
+import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_INDEX_REBUILD_ASYNC;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.assertExpectedOutput;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.checkForPreConditions;
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.computeClientVersions;
@@ -34,6 +36,9 @@ import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.UpgradePr
import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.UpgradeProps.SET_MAX_LOOK_BACK_AGE;
import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -44,6 +49,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.query.QueryServices;
@@ -305,19 +311,67 @@ public class BackwardCompatibilityIT {
}
@Test
- public void testUpdatedSplitPolicyForSysTask() throws Exception {
- executeQueryWithClientVersion(compatibleClientVersion, CREATE_DIVERGED_VIEW, zkQuorum);
- executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, NONE);
+ public void testSplitPolicyAndCoprocessorForSysTask() throws Exception {
+ executeQueryWithClientVersion(compatibleClientVersion,
+ CREATE_DIVERGED_VIEW, zkQuorum);
- try (org.apache.hadoop.hbase.client.Connection conn =
- hbaseTestUtil.getConnection(); Admin admin = conn.getAdmin()) {
+ String[] versionArr = compatibleClientVersion.split("\\.");
+ int majorVersion = Integer.parseInt(versionArr[0]);
+ int minorVersion = Integer.parseInt(versionArr[1]);
+ org.apache.hadoop.hbase.client.Connection conn = null;
+ Admin admin = null;
+ // if connected with client < 4.15, SYSTEM.TASK does not exist
+ // if connected with client 4.15, SYSTEM.TASK exists without any
+ // split policy and also TaskMetaDataEndpoint coprocessor would not
+ // exist
+ if (majorVersion == 4 && minorVersion == 15) {
+ conn = hbaseTestUtil.getConnection();
+ admin = conn.getAdmin();
HTableDescriptor tableDescriptor = admin.getTableDescriptor(
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
- assertEquals("split policy not updated with compatible client version: "
+ assertNull("split policy should be null with compatible client version: "
+ + compatibleClientVersion, tableDescriptor.getRegionSplitPolicyClassName());
+ assertFalse("Coprocessor " + TaskMetaDataEndpoint.class.getName()
+ + " should not have been added with compatible client version: "
+ compatibleClientVersion,
- tableDescriptor.getRegionSplitPolicyClassName(),
- SystemTaskSplitPolicy.class.getName());
+ tableDescriptor.hasCoprocessor(TaskMetaDataEndpoint.class.getName()));
+ }
+
+ executeQueriesWithCurrentVersion(QUERY_CREATE_DIVERGED_VIEW, url, NONE);
+
+ if (conn == null) {
+ conn = hbaseTestUtil.getConnection();
+ admin = conn.getAdmin();
}
+ // connect with client > 4.15, and we have new split policy and new
+ // coprocessor loaded
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(
+ TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
+ assertEquals("split policy not updated with compatible client version: "
+ + compatibleClientVersion,
+ tableDescriptor.getRegionSplitPolicyClassName(),
+ SystemTaskSplitPolicy.class.getName());
+ assertTrue("Coprocessor " + TaskMetaDataEndpoint.class.getName()
+ + " has not been added with compatible client version: "
+ + compatibleClientVersion, tableDescriptor.hasCoprocessor(
+ TaskMetaDataEndpoint.class.getName()));
assertExpectedOutput(QUERY_CREATE_DIVERGED_VIEW);
+ admin.close();
+ conn.close();
}
+
+ @Test
+ public void testSystemTaskCreationWithIndexAsyncRebuild() throws Exception {
+ String[] versionArr = compatibleClientVersion.split("\\.");
+ int majorVersion = Integer.parseInt(versionArr[0]);
+ int minorVersion = Integer.parseInt(versionArr[1]);
+ // index async rebuild support min version check
+ if (majorVersion > 4 || (majorVersion == 4 && minorVersion >= 15)) {
+ executeQueryWithClientVersion(compatibleClientVersion,
+ INDEX_REBUILD_ASYNC, zkQuorum);
+ executeQueriesWithCurrentVersion(QUERY_INDEX_REBUILD_ASYNC, url, NONE);
+ assertExpectedOutput(QUERY_INDEX_REBUILD_ASYNC);
+ }
+ }
+
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
index c79e19b..e5a7d3d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
@@ -84,6 +84,9 @@ public final class BackwardCompatibilityTestUtil {
public static final String QUERY_DELETE = QUERY_PREFIX + DELETE;
public static final String QUERY_SELECT_AND_DROP_TABLE = QUERY_PREFIX + SELECT_AND_DROP_TABLE;
public static final String QUERY_CREATE_DIVERGED_VIEW = QUERY_PREFIX + CREATE_DIVERGED_VIEW;
+ public static final String INDEX_REBUILD_ASYNC = "index_rebuild_async";
+ public static final String QUERY_INDEX_REBUILD_ASYNC = QUERY_PREFIX
+ + INDEX_REBUILD_ASYNC;
public static final String MVN_HOME = "maven.home";
public static final String JAVA_TMP_DIR = "java.io.tmpdir";
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
index 9d97cc5..71f5ae4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.MetaDataUtil;
@@ -147,9 +148,19 @@ public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
- Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD,
- TENANT1, null, viewName,
- PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true);
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(conn.unwrap(PhoenixConnection.class))
+ .setTaskType(PTable.TaskType.INDEX_REBUILD)
+ .setTenantId(TENANT1)
+ .setSchemaName(null)
+ .setTableName(viewName)
+ .setTaskStatus(PTable.TaskStatus.CREATED.toString())
+ .setData(data)
+ .setPriority(null)
+ .setStartTs(startTs)
+ .setEndTs(null)
+ .setAccessCheckEnabled(true)
+ .build());
task.run();
// Check task status and other column values.
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
index e38c5e6..503ede4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
@@ -25,9 +25,11 @@ import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -127,12 +129,12 @@ public class SystemTablesUpgradeIT extends BaseTest {
// SystemTaskSplitPolicy (which is extending DisabledRegionSplitPolicy
// as of this writing)
try (Admin admin = services.getAdmin()) {
- String taskSplitPolicy = admin
- .getTableDescriptor(TableName.valueOf(
- PhoenixDatabaseMetaData.SYSTEM_TASK_NAME))
- .getRegionSplitPolicyClassName();
+ HTableDescriptor td = admin.getTableDescriptor(TableName.valueOf(
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
+ String taskSplitPolicy = td.getRegionSplitPolicyClassName();
assertEquals(SystemTaskSplitPolicy.class.getName(),
taskSplitPolicy);
+ assertTrue(td.hasCoprocessor(TaskMetaDataEndpoint.class.getName()));
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index 6352d35..00d9c8c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_UPDATE_CACHE_FREQUENCY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -665,9 +666,20 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
conn.createStatement().execute(
"ALTER INDEX " + indexName + " ON " + testTable + " REBUILD ALL ASYNC");
- String
- queryTaskTable =
- "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
+ ResultSet resultSet = conn.createStatement().executeQuery(
+ "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+ assertTrue(resultSet.next());
+ assertEquals("2", resultSet.getString(1));
+ assertNull(resultSet.getString(3));
+ assertNull(resultSet.getString(4));
+ assertEquals(testTable, resultSet.getString(5));
+ assertEquals("CREATED", resultSet.getString(6));
+ assertEquals("4", resultSet.getString(8));
+ assertEquals(
+ "{\"IndexName\":\"" + indexName + "\",\"RebuildAll\":true}",
+ resultSet.getString(9));
+ String queryTaskTable =
+ "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
ResultSet rs = conn.createStatement().executeQuery(queryTaskTable);
assertTrue(rs.next());
assertEquals(testTable, rs.getString(TABLE_NAME));
diff --git a/phoenix-core/src/it/resources/gold_files/gold_query_index_rebuild_async.txt b/phoenix-core/src/it/resources/gold_files/gold_query_index_rebuild_async.txt
new file mode 100644
index 0000000..ff020c5
--- /dev/null
+++ b/phoenix-core/src/it/resources/gold_files/gold_query_index_rebuild_async.txt
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'TASK_TYPE','TABLE_NAME','TASK_STATUS','TASK_PRIORITY'
+'2','TI','COMPLETED','4'
+'K','V'
+'key1','val2'
+'key3','val3'
diff --git a/phoenix-core/src/it/resources/sql_files/index_rebuild_async.sql b/phoenix-core/src/it/resources/sql_files/index_rebuild_async.sql
new file mode 100644
index 0000000..17133c0
--- /dev/null
+++ b/phoenix-core/src/it/resources/sql_files/index_rebuild_async.sql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS S.TI (K VARCHAR PRIMARY KEY, V VARCHAR);
+UPSERT INTO S.TI VALUES ('key1', 'val1');
+CREATE INDEX R_ASYNCIND_TI ON S.TI (K, V);
+ALTER INDEX R_ASYNCIND_TI ON S.TI DISABLE;
+UPSERT INTO S.TI VALUES ('key1', 'val2');
+ALTER INDEX R_ASYNCIND_TI ON S.TI REBUILD ALL ASYNC;
+UPSERT INTO S.TI VALUES ('key3', 'val3');
+UPSERT INTO S.TI VALUES ('key4', 'val4');
+DELETE FROM S.TI WHERE K = 'key4';
diff --git a/phoenix-core/src/it/resources/sql_files/query_index_rebuild_async.sql b/phoenix-core/src/it/resources/sql_files/query_index_rebuild_async.sql
new file mode 100644
index 0000000..46d073e
--- /dev/null
+++ b/phoenix-core/src/it/resources/sql_files/query_index_rebuild_async.sql
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SELECT TASK_TYPE, TABLE_NAME, TASK_STATUS, TASK_PRIORITY FROM SYSTEM.TASK;
+SELECT * FROM S.TI;
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
index 1f9e29b..b2ddf3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
@@ -113,4 +113,10 @@ public class BaseMetaDataEndpointObserver implements MetaDataEndpointObserver{
public void preCreateViewAddChildLink(
final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx,
final String tableName) throws IOException {}
+
+ @Override
+ public void preUpsertTaskDetails(
+ final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx,
+ final String tableName) throws IOException {
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index fde1aea..fc3ddb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -210,6 +210,7 @@ import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.task.SystemTaskParams;
import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
@@ -2274,13 +2275,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
try (PhoenixConnection conn =
QueryUtil.getConnectionOnServer(env.getConfiguration())
.unwrap(PhoenixConnection.class)) {
- Task.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS,
- Bytes.toString(tenantIdBytes),
- Bytes.toString(schemaName),
- Bytes.toString(tableOrViewName),
- PTable.TaskStatus.CREATED.toString(),
- null, null, null, null,
- this.accessCheckEnabled);
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(conn)
+ .setTaskType(PTable.TaskType.DROP_CHILD_VIEWS)
+ .setTenantId(Bytes.toString(tenantIdBytes))
+ .setSchemaName(Bytes.toString(schemaName))
+ .setTableName(Bytes.toString(tableOrViewName))
+ .setTaskStatus(
+ PTable.TaskStatus.CREATED.toString())
+ .setData(null)
+ .setPriority(null)
+ .setStartTs(null)
+ .setEndTs(null)
+ .setAccessCheckEnabled(this.accessCheckEnabled)
+ .build());
} catch (Throwable t) {
LOGGER.error("Adding a task to drop child views failed!", t);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java
index 629f00b..d569c29 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java
@@ -68,4 +68,7 @@ public interface MetaDataEndpointObserver extends Coprocessor {
void preCreateViewAddChildLink(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx,
final String tableName) throws IOException;
+ void preUpsertTaskDetails(
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx,
+ String tableName) throws IOException;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 5d0fc85..e6f0bb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -178,8 +178,9 @@ public abstract class MetaDataProtocol extends MetaDataService {
UNABLE_TO_CREATE_CHILD_LINK,
UNABLE_TO_UPDATE_PARENT_TABLE,
UNABLE_TO_DELETE_CHILD_LINK,
+ UNABLE_TO_UPSERT_TASK,
NO_OP
- };
+ }
public static class SharedTableState {
private PName tenantId;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
index f7ff05c..ed2c6c3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
@@ -264,4 +264,18 @@ public class PhoenixMetaDataCoprocessorHost
}
return user;
}
+
+ void preUpsertTaskDetails(final String tableName) throws IOException {
+ execOperation(
+ new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>(
+ getActiveUser()) {
+ @Override
+ public void call(MetaDataEndpointObserver observer,
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx)
+ throws IOException {
+ observer.preUpsertTaskDetails(ctx, tableName);
+ }
+ });
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpoint.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpoint.java
new file mode 100644
index 0000000..60b8f6b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpoint.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessor;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+ .TaskMetaDataService;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+ .TaskMutateRequest;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl
+ .mutateRowsWithLocks;
+
+/**
+ * Phoenix metadata mutations for SYSTEM.TASK flows through this co-processor
+ * Endpoint.
+ */
+public class TaskMetaDataEndpoint extends TaskMetaDataService
+ implements CoprocessorService, Coprocessor {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TaskMetaDataEndpoint.class);
+
+ private RegionCoprocessorEnvironment env;
+ private PhoenixMetaDataCoprocessorHost phoenixAccessCoprocessorHost;
+ private boolean accessCheckEnabled;
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment) env;
+ } else {
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+ this.phoenixAccessCoprocessorHost =
+ new PhoenixMetaDataCoprocessorHost(this.env);
+ this.accessCheckEnabled = env.getConfiguration().getBoolean(
+ QueryServices.PHOENIX_ACLS_ENABLED,
+ QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // no-op
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void upsertTaskDetails(RpcController controller,
+ TaskMutateRequest request, RpcCallback<MetaDataResponse> done) {
+ MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+ try {
+ List<Mutation> taskMutations = ProtobufUtil.getMutations(request);
+ if (taskMutations.isEmpty()) {
+ done.run(builder.build());
+ return;
+ }
+ byte[][] rowKeyMetaData = new byte[3][];
+ MetaDataUtil.getTenantIdAndSchemaAndTableName(taskMutations,
+ rowKeyMetaData);
+ byte[] schemaName =
+ rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+ byte[] tableName =
+ rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+ String fullTableName = SchemaUtil.getTableName(schemaName,
+ tableName);
+
+ phoenixAccessCoprocessorHost.preUpsertTaskDetails(fullTableName);
+
+ mutateRowsWithLocks(this.accessCheckEnabled, this.env.getRegion(),
+ taskMutations, Collections.<byte[]>emptySet(),
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
+ } catch (Throwable t) {
+ LOGGER.error("Unable to write mutations to {}",
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME, t);
+ builder.setReturnCode(
+ MetaDataProtos.MutationCode.UNABLE_TO_UPSERT_TASK);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ done.run(builder.build());
+ }
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index e2cc782..3a99d6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -32,6 +32,7 @@ import javax.annotation.concurrent.GuardedBy;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.phoenix.schema.task.SystemTaskParams;
import org.apache.phoenix.util.JacksonUtil;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
@@ -195,10 +196,19 @@ public class TaskRegionObserver extends BaseRegionObserver {
}
// Change task status to STARTED
- Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
- taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(),
- taskRecord.getData(), taskRecord.getPriority(), taskRecord.getTimeStamp(), null,
- true);
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(connForTask)
+ .setTaskType(taskRecord.getTaskType())
+ .setTenantId(taskRecord.getTenantId())
+ .setSchemaName(taskRecord.getSchemaName())
+ .setTableName(taskRecord.getTableName())
+ .setTaskStatus(PTable.TaskStatus.STARTED.toString())
+ .setData(taskRecord.getData())
+ .setPriority(taskRecord.getPriority())
+ .setStartTs(taskRecord.getTimeStamp())
+ .setEndTs(null)
+ .setAccessCheckEnabled(true)
+ .build());
// invokes the method at runtime
result = (TaskResult) runMethod.invoke(obj, taskRecord);
@@ -251,9 +261,19 @@ public class TaskRegionObserver extends BaseRegionObserver {
data = jsonNode.toString();
Timestamp endTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
- Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
- taskRecord.getTableName(), taskStatus, data, taskRecord.getPriority(),
- taskRecord.getTimeStamp(), endTs, true);
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(connForTask)
+ .setTaskType(taskRecord.getTaskType())
+ .setTenantId(taskRecord.getTenantId())
+ .setSchemaName(taskRecord.getSchemaName())
+ .setTableName(taskRecord.getTableName())
+ .setTaskStatus(taskStatus)
+ .setData(data)
+ .setPriority(taskRecord.getPriority())
+ .setStartTs(taskRecord.getTimeStamp())
+ .setEndTs(endTs)
+ .setAccessCheckEnabled(true)
+ .build());
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
index 28b58c6..38283a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
@@ -117,6 +117,10 @@ public final class MetaDataProtos {
* <code>UNABLE_TO_DELETE_CHILD_LINK = 25;</code>
*/
UNABLE_TO_DELETE_CHILD_LINK(25, 25),
+ /**
+ * <code>UNABLE_TO_UPSERT_TASK = 26;</code>
+ */
+ UNABLE_TO_UPSERT_TASK(26, 26),
;
/**
@@ -223,6 +227,10 @@ public final class MetaDataProtos {
* <code>UNABLE_TO_DELETE_CHILD_LINK = 25;</code>
*/
public static final int UNABLE_TO_DELETE_CHILD_LINK_VALUE = 25;
+ /**
+ * <code>UNABLE_TO_UPSERT_TASK = 26;</code>
+ */
+ public static final int UNABLE_TO_UPSERT_TASK_VALUE = 26;
public final int getNumber() { return value; }
@@ -255,6 +263,7 @@ public final class MetaDataProtos {
case 23: return UNABLE_TO_CREATE_CHILD_LINK;
case 24: return UNABLE_TO_UPDATE_PARENT_TABLE;
case 25: return UNABLE_TO_DELETE_CHILD_LINK;
+ case 26: return UNABLE_TO_UPSERT_TASK;
default: return null;
}
}
@@ -18048,7 +18057,7 @@ public final class MetaDataProtos {
"cheRequest\022\020\n\010tenantId\030\001 \002(\014\022\022\n\nschemaNa" +
"me\030\002 \002(\014\022\021\n\ttableName\030\003 \002(\014\022\027\n\017clientTim" +
"estamp\030\004 \002(\003\022\025\n\rclientVersion\030\005 \001(\005\"\035\n\033C" +
- "learTableFromCacheResponse*\332\005\n\014MutationC" +
+ "learTableFromCacheResponse*\365\005\n\014MutationC" +
"ode\022\030\n\024TABLE_ALREADY_EXISTS\020\000\022\023\n\017TABLE_N" +
"OT_FOUND\020\001\022\024\n\020COLUMN_NOT_FOUND\020\002\022\031\n\025COLU",
"MN_ALREADY_EXISTS\020\003\022\035\n\031CONCURRENT_TABLE_" +
@@ -18066,31 +18075,32 @@ public final class MetaDataProtos {
"#\n\037CANNOT_COERCE_AUTO_PARTITION_ID\020\025\022\024\n\020" +
"TOO_MANY_INDEXES\020\026\022\037\n\033UNABLE_TO_CREATE_C" +
"HILD_LINK\020\027\022!\n\035UNABLE_TO_UPDATE_PARENT_T" +
- "ABLE\020\030\022\037\n\033UNABLE_TO_DELETE_CHILD_LINK\020\0312" +
- "\345\006\n\017MetaDataService\022/\n\010getTable\022\020.GetTab" +
- "leRequest\032\021.MetaDataResponse\0227\n\014getFunct" +
- "ions\022\024.GetFunctionsRequest\032\021.MetaDataRes" +
- "ponse\0221\n\tgetSchema\022\021.GetSchemaRequest\032\021.",
- "MetaDataResponse\0225\n\013createTable\022\023.Create" +
- "TableRequest\032\021.MetaDataResponse\022;\n\016creat" +
- "eFunction\022\026.CreateFunctionRequest\032\021.Meta" +
- "DataResponse\0227\n\014createSchema\022\024.CreateSch" +
- "emaRequest\032\021.MetaDataResponse\0221\n\tdropTab" +
- "le\022\021.DropTableRequest\032\021.MetaDataResponse" +
- "\0223\n\ndropSchema\022\022.DropSchemaRequest\032\021.Met" +
- "aDataResponse\0227\n\014dropFunction\022\024.DropFunc" +
- "tionRequest\032\021.MetaDataResponse\0221\n\taddCol" +
- "umn\022\021.AddColumnRequest\032\021.MetaDataRespons",
- "e\0223\n\ndropColumn\022\022.DropColumnRequest\032\021.Me" +
- "taDataResponse\022?\n\020updateIndexState\022\030.Upd" +
- "ateIndexStateRequest\032\021.MetaDataResponse\022" +
- "5\n\nclearCache\022\022.ClearCacheRequest\032\023.Clea" +
- "rCacheResponse\0225\n\ngetVersion\022\022.GetVersio" +
- "nRequest\032\023.GetVersionResponse\022P\n\023clearTa" +
- "bleFromCache\022\033.ClearTableFromCacheReques" +
- "t\032\034.ClearTableFromCacheResponseBB\n(org.a" +
- "pache.phoenix.coprocessor.generatedB\016Met" +
- "aDataProtosH\001\210\001\001\240\001\001"
+ "ABLE\020\030\022\037\n\033UNABLE_TO_DELETE_CHILD_LINK\020\031\022" +
+ "\031\n\025UNABLE_TO_UPSERT_TASK\020\0322\345\006\n\017MetaDataS" +
+ "ervice\022/\n\010getTable\022\020.GetTableRequest\032\021.M" +
+ "etaDataResponse\0227\n\014getFunctions\022\024.GetFun" +
+ "ctionsRequest\032\021.MetaDataResponse\0221\n\tgetS",
+ "chema\022\021.GetSchemaRequest\032\021.MetaDataRespo" +
+ "nse\0225\n\013createTable\022\023.CreateTableRequest\032" +
+ "\021.MetaDataResponse\022;\n\016createFunction\022\026.C" +
+ "reateFunctionRequest\032\021.MetaDataResponse\022" +
+ "7\n\014createSchema\022\024.CreateSchemaRequest\032\021." +
+ "MetaDataResponse\0221\n\tdropTable\022\021.DropTabl" +
+ "eRequest\032\021.MetaDataResponse\0223\n\ndropSchem" +
+ "a\022\022.DropSchemaRequest\032\021.MetaDataResponse" +
+ "\0227\n\014dropFunction\022\024.DropFunctionRequest\032\021" +
+ ".MetaDataResponse\0221\n\taddColumn\022\021.AddColu",
+ "mnRequest\032\021.MetaDataResponse\0223\n\ndropColu" +
+ "mn\022\022.DropColumnRequest\032\021.MetaDataRespons" +
+ "e\022?\n\020updateIndexState\022\030.UpdateIndexState" +
+ "Request\032\021.MetaDataResponse\0225\n\nclearCache" +
+ "\022\022.ClearCacheRequest\032\023.ClearCacheRespons" +
+ "e\0225\n\ngetVersion\022\022.GetVersionRequest\032\023.Ge" +
+ "tVersionResponse\022P\n\023clearTableFromCache\022" +
+ "\033.ClearTableFromCacheRequest\032\034.ClearTabl" +
+ "eFromCacheResponseBB\n(org.apache.phoenix" +
+ ".coprocessor.generatedB\016MetaDataProtosH\001",
+ "\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/TaskMetaDataProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/TaskMetaDataProtos.java
new file mode 100644
index 0000000..78c1f70
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/TaskMetaDataProtos.java
@@ -0,0 +1,784 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: TaskMetaDataService.proto
+
+package org.apache.phoenix.coprocessor.generated;
+
+public final class TaskMetaDataProtos {
+ private TaskMetaDataProtos() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface TaskMutateRequestOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // repeated bytes tableMetadataMutations = 1;
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ java.util.List<com.google.protobuf.ByteString> getTableMetadataMutationsList();
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ int getTableMetadataMutationsCount();
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ com.google.protobuf.ByteString getTableMetadataMutations(int index);
+ }
+ /**
+ * Protobuf type {@code TaskMutateRequest}
+ */
+ public static final class TaskMutateRequest extends
+ com.google.protobuf.GeneratedMessage
+ implements TaskMutateRequestOrBuilder {
+ // Use TaskMutateRequest.newBuilder() to construct.
+ private TaskMutateRequest(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private TaskMutateRequest(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final TaskMutateRequest defaultInstance;
+ public static TaskMutateRequest getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public TaskMutateRequest getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private TaskMutateRequest(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+ tableMetadataMutations_ = new java.util.ArrayList<com.google.protobuf.ByteString>();
+ mutable_bitField0_ |= 0x00000001;
+ }
+ tableMetadataMutations_.add(input.readBytes());
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+ tableMetadataMutations_ = java.util.Collections.unmodifiableList(tableMetadataMutations_);
+ }
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.class, org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser<TaskMutateRequest> PARSER =
+ new com.google.protobuf.AbstractParser<TaskMutateRequest>() {
+ public TaskMutateRequest parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new TaskMutateRequest(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser<TaskMutateRequest> getParserForType() {
+ return PARSER;
+ }
+
+ // repeated bytes tableMetadataMutations = 1;
+ public static final int TABLEMETADATAMUTATIONS_FIELD_NUMBER = 1;
+ private java.util.List<com.google.protobuf.ByteString> tableMetadataMutations_;
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public java.util.List<com.google.protobuf.ByteString>
+ getTableMetadataMutationsList() {
+ return tableMetadataMutations_;
+ }
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public int getTableMetadataMutationsCount() {
+ return tableMetadataMutations_.size();
+ }
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public com.google.protobuf.ByteString getTableMetadataMutations(int index) {
+ return tableMetadataMutations_.get(index);
+ }
+
+ private void initFields() {
+ tableMetadataMutations_ = java.util.Collections.emptyList();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ for (int i = 0; i < tableMetadataMutations_.size(); i++) {
+ output.writeBytes(1, tableMetadataMutations_.get(i));
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ {
+ int dataSize = 0;
+ for (int i = 0; i < tableMetadataMutations_.size(); i++) {
+ dataSize += com.google.protobuf.CodedOutputStream
+ .computeBytesSizeNoTag(tableMetadataMutations_.get(i));
+ }
+ size += dataSize;
+ size += 1 * getTableMetadataMutationsList().size();
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)) {
+ return super.equals(obj);
+ }
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest other = (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest) obj;
+
+ boolean result = true;
+ result = result && getTableMetadataMutationsList()
+ .equals(other.getTableMetadataMutationsList());
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ private int memoizedHashCode = 0;
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (getTableMetadataMutationsCount() > 0) {
+ hash = (37 * hash) + TABLEMETADATAMUTATIONS_FIELD_NUMBER;
+ hash = (53 * hash) + getTableMetadataMutationsList().hashCode();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code TaskMutateRequest}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequestOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.class, org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.Builder.class);
+ }
+
+ // Construct using org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ tableMetadataMutations_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.internal_static_TaskMutateRequest_descriptor;
+ }
+
+ public org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest getDefaultInstanceForType() {
+ return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance();
+ }
+
+ public org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest build() {
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest buildPartial() {
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest result = new org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest(this);
+ int from_bitField0_ = bitField0_;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ tableMetadataMutations_ = java.util.Collections.unmodifiableList(tableMetadataMutations_);
+ bitField0_ = (bitField0_ & ~0x00000001);
+ }
+ result.tableMetadataMutations_ = tableMetadataMutations_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest) {
+ return mergeFrom((org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest other) {
+ if (other == org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance()) return this;
+ if (!other.tableMetadataMutations_.isEmpty()) {
+ if (tableMetadataMutations_.isEmpty()) {
+ tableMetadataMutations_ = other.tableMetadataMutations_;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ } else {
+ ensureTableMetadataMutationsIsMutable();
+ tableMetadataMutations_.addAll(other.tableMetadataMutations_);
+ }
+ onChanged();
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // repeated bytes tableMetadataMutations = 1;
+ private java.util.List<com.google.protobuf.ByteString> tableMetadataMutations_ = java.util.Collections.emptyList();
+ private void ensureTableMetadataMutationsIsMutable() {
+ if (!((bitField0_ & 0x00000001) == 0x00000001)) {
+ tableMetadataMutations_ = new java.util.ArrayList<com.google.protobuf.ByteString>(tableMetadataMutations_);
+ bitField0_ |= 0x00000001;
+ }
+ }
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public java.util.List<com.google.protobuf.ByteString>
+ getTableMetadataMutationsList() {
+ return java.util.Collections.unmodifiableList(tableMetadataMutations_);
+ }
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public int getTableMetadataMutationsCount() {
+ return tableMetadataMutations_.size();
+ }
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public com.google.protobuf.ByteString getTableMetadataMutations(int index) {
+ return tableMetadataMutations_.get(index);
+ }
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public Builder setTableMetadataMutations(
+ int index, com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureTableMetadataMutationsIsMutable();
+ tableMetadataMutations_.set(index, value);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public Builder addTableMetadataMutations(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureTableMetadataMutationsIsMutable();
+ tableMetadataMutations_.add(value);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public Builder addAllTableMetadataMutations(
+ java.lang.Iterable<? extends com.google.protobuf.ByteString> values) {
+ ensureTableMetadataMutationsIsMutable();
+ super.addAll(values, tableMetadataMutations_);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated bytes tableMetadataMutations = 1;</code>
+ */
+ public Builder clearTableMetadataMutations() {
+ tableMetadataMutations_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000001);
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:TaskMutateRequest)
+ }
+
+ static {
+ defaultInstance = new TaskMutateRequest(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:TaskMutateRequest)
+ }
+
+ /**
+ * Protobuf service {@code TaskMetaDataService}
+ */
+ public static abstract class TaskMetaDataService
+ implements com.google.protobuf.Service {
+ protected TaskMetaDataService() {}
+
+ public interface Interface {
+ /**
+ * <code>rpc upsertTaskDetails(.TaskMutateRequest) returns (.MetaDataResponse);</code>
+ */
+ public abstract void upsertTaskDetails(
+ com.google.protobuf.RpcController controller,
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request,
+ com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done);
+
+ }
+
+ public static com.google.protobuf.Service newReflectiveService(
+ final Interface impl) {
+ return new TaskMetaDataService() {
+ @java.lang.Override
+ public void upsertTaskDetails(
+ com.google.protobuf.RpcController controller,
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request,
+ com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done) {
+ impl.upsertTaskDetails(controller, request, done);
+ }
+
+ };
+ }
+
+ public static com.google.protobuf.BlockingService
+ newReflectiveBlockingService(final BlockingInterface impl) {
+ return new com.google.protobuf.BlockingService() {
+ public final com.google.protobuf.Descriptors.ServiceDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+
+ public final com.google.protobuf.Message callBlockingMethod(
+ com.google.protobuf.Descriptors.MethodDescriptor method,
+ com.google.protobuf.RpcController controller,
+ com.google.protobuf.Message request)
+ throws com.google.protobuf.ServiceException {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.callBlockingMethod() given method descriptor for " +
+ "wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return impl.upsertTaskDetails(controller, (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)request);
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getRequestPrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getRequestPrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getResponsePrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getResponsePrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ };
+ }
+
+ /**
+ * <code>rpc upsertTaskDetails(.TaskMutateRequest) returns (.MetaDataResponse);</code>
+ */
+ public abstract void upsertTaskDetails(
+ com.google.protobuf.RpcController controller,
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request,
+ com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done);
+
+ public static final
+ com.google.protobuf.Descriptors.ServiceDescriptor
+ getDescriptor() {
+ return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.getDescriptor().getServices().get(0);
+ }
+ public final com.google.protobuf.Descriptors.ServiceDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+
+ public final void callMethod(
+ com.google.protobuf.Descriptors.MethodDescriptor method,
+ com.google.protobuf.RpcController controller,
+ com.google.protobuf.Message request,
+ com.google.protobuf.RpcCallback<
+ com.google.protobuf.Message> done) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.callMethod() given method descriptor for wrong " +
+ "service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ this.upsertTaskDetails(controller, (org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest)request,
+ com.google.protobuf.RpcUtil.<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse>specializeCallback(
+ done));
+ return;
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getRequestPrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getRequestPrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public final com.google.protobuf.Message
+ getResponsePrototype(
+ com.google.protobuf.Descriptors.MethodDescriptor method) {
+ if (method.getService() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "Service.getResponsePrototype() given method " +
+ "descriptor for wrong service type.");
+ }
+ switch(method.getIndex()) {
+ case 0:
+ return org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance();
+ default:
+ throw new java.lang.AssertionError("Can't get here.");
+ }
+ }
+
+ public static Stub newStub(
+ com.google.protobuf.RpcChannel channel) {
+ return new Stub(channel);
+ }
+
+ public static final class Stub extends org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMetaDataService implements Interface {
+ private Stub(com.google.protobuf.RpcChannel channel) {
+ this.channel = channel;
+ }
+
+ private final com.google.protobuf.RpcChannel channel;
+
+ public com.google.protobuf.RpcChannel getChannel() {
+ return channel;
+ }
+
+ public void upsertTaskDetails(
+ com.google.protobuf.RpcController controller,
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request,
+ com.google.protobuf.RpcCallback<org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse> done) {
+ channel.callMethod(
+ getDescriptor().getMethods().get(0),
+ controller,
+ request,
+ org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance(),
+ com.google.protobuf.RpcUtil.generalizeCallback(
+ done,
+ org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.class,
+ org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance()));
+ }
+ }
+
+ public static BlockingInterface newBlockingStub(
+ com.google.protobuf.BlockingRpcChannel channel) {
+ return new BlockingStub(channel);
+ }
+
+ public interface BlockingInterface {
+ public org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse upsertTaskDetails(
+ com.google.protobuf.RpcController controller,
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request)
+ throws com.google.protobuf.ServiceException;
+ }
+
+ private static final class BlockingStub implements BlockingInterface {
+ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
+ this.channel = channel;
+ }
+
+ private final com.google.protobuf.BlockingRpcChannel channel;
+
+ public org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse upsertTaskDetails(
+ com.google.protobuf.RpcController controller,
+ org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos.TaskMutateRequest request)
+ throws com.google.protobuf.ServiceException {
+ return (org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse) channel.callBlockingMethod(
+ getDescriptor().getMethods().get(0),
+ controller,
+ request,
+ org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse.getDefaultInstance());
+ }
+
+ }
+
+ // @@protoc_insertion_point(class_scope:TaskMetaDataService)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_TaskMutateRequest_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_TaskMutateRequest_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n\031TaskMetaDataService.proto\032\025MetaDataSer" +
+ "vice.proto\"3\n\021TaskMutateRequest\022\036\n\026table" +
+ "MetadataMutations\030\001 \003(\0142Q\n\023TaskMetaDataS" +
+ "ervice\022:\n\021upsertTaskDetails\022\022.TaskMutate" +
+ "Request\032\021.MetaDataResponseBF\n(org.apache" +
+ ".phoenix.coprocessor.generatedB\022TaskMeta" +
+ "DataProtosH\001\210\001\001\240\001\001"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_TaskMutateRequest_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_TaskMutateRequest_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_TaskMutateRequest_descriptor,
+ new java.lang.String[] { "TableMetadataMutations", });
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ org.apache.phoenix.coprocessor.generated.MetaDataProtos.getDescriptor(),
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
index 4e3b639..b4c99ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.phoenix.schema.task.SystemTaskParams;
import org.apache.phoenix.util.JacksonUtil;
import org.apache.hadoop.conf.Configuration;
@@ -106,10 +107,19 @@ public class IndexRebuildTask extends BaseTask {
Job job = indexToolRes.getValue();
((ObjectNode) jsonNode).put(JOB_ID, job.getJobID().toString());
- Task.addTask(conn.unwrap(PhoenixConnection.class ), taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
- taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(),
- jsonNode.toString(), taskRecord.getPriority(),
- taskRecord.getTimeStamp(), null, true);
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(conn.unwrap(PhoenixConnection.class))
+ .setTaskType(taskRecord.getTaskType())
+ .setTenantId(taskRecord.getTenantId())
+ .setSchemaName(taskRecord.getSchemaName())
+ .setTableName(taskRecord.getTableName())
+ .setTaskStatus(PTable.TaskStatus.STARTED.toString())
+ .setData(jsonNode.toString())
+ .setPriority(taskRecord.getPriority())
+ .setStartTs(taskRecord.getTimeStamp())
+ .setEndTs(null)
+ .setAccessCheckEnabled(true)
+ .build());
// It will take some time to finish, so we will check the status in a separate task.
return null;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 9f4c139..788dea5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -433,6 +433,8 @@ public enum SQLExceptionCode {
PTable.LinkType.CHILD_TABLE + ") for view"),
TABLE_NOT_IN_REGION(1145, "XCL45", "No modifications allowed on this table. "
+ "Table not in this region."),
+ UNABLE_TO_UPSERT_TASK(1146, "XCL46",
+ "Error upserting records in SYSTEM.TASK table"),
/**
* Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).
*/
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
index 45f43e1..66528dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java
@@ -34,6 +34,8 @@ import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateVi
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+ .TaskMutateRequest;
import org.apache.phoenix.schema.PTableType;
import com.google.protobuf.ByteString;
@@ -108,6 +110,11 @@ public class ProtobufUtil {
return getMutations(request.getTableMetadataMutationsList());
}
+ public static List<Mutation> getMutations(TaskMutateRequest request)
+ throws IOException {
+ return getMutations(request.getTableMetadataMutationsList());
+ }
+
public static List<Mutation> getMutations(CreateViewAddChildLinkRequest request)
throws IOException {
return getMutations(request.getTableMetadataMutationsList());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index d76c647..7d93abd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -168,10 +168,10 @@ import org.apache.phoenix.coprocessor.ScanRegionObserver;
import org.apache.phoenix.coprocessor.SequenceRegionObserver;
import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
import org.apache.phoenix.coprocessor.PhoenixTTLRegionObserver;
+import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.ChildLinkMetaDataService;
-import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateViewAddChildLinkRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
@@ -1095,6 +1095,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if(!descriptor.hasCoprocessor(TaskRegionObserver.class.getName())) {
descriptor.addCoprocessor(TaskRegionObserver.class.getName(), null, priority, null);
}
+ if (!descriptor.hasCoprocessor(
+ TaskMetaDataEndpoint.class.getName())) {
+ descriptor.addCoprocessor(TaskMetaDataEndpoint.class.getName(),
+ null, priority, null);
+ }
} else if (SchemaUtil.isChildLinkTable(tableName)) {
if (!descriptor.hasCoprocessor(ChildLinkMetaDataEndpoint.class.getName())) {
descriptor.addCoprocessor(ChildLinkMetaDataEndpoint.class.getName(), null, priority, null);
@@ -4146,7 +4151,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
TableName tableName = SchemaUtil.getPhysicalTableName(
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME, props);
td = admin.getTableDescriptor(tableName);
+ boolean isTableDescUpdated = false;
if (updateAndConfirmSplitPolicyForTask(td)) {
+ isTableDescUpdated = true;
+ }
+ if (!td.hasCoprocessor(TaskMetaDataEndpoint.class.getName())) {
+ int priority = props.getInt(
+ QueryServices.COPROCESSOR_PRIORITY_ATTRIB,
+ QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
+ td.addCoprocessor(
+ TaskMetaDataEndpoint.class.getName(), null, priority,
+ null);
+ isTableDescUpdated = true;
+ }
+ if (isTableDescUpdated) {
admin.modifyTable(tableName, td);
pollForUpdatedTableDescriptor(admin,
td, tableName.getName());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 4e20997..68b0891 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -79,6 +79,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYNC_INDEX_CREATED
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
@@ -100,6 +101,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_PHOENIX_TTL_HW
import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
@@ -170,6 +172,8 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.coprocessor.MetaDataProtocol.SharedTableState;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.util.TaskMetaDataServiceCallBack;
import org.apache.phoenix.util.ViewUtil;
import org.apache.phoenix.util.JacksonUtil;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -4690,11 +4694,32 @@ public class MetaDataClient {
}};
try {
String json = JacksonUtil.getObjectWriter().writeValueAsString(props);
- Task.addTask(connection, PTable.TaskType.INDEX_REBUILD,
- tenantId, schemaName,
- dataTableName, PTable.TaskStatus.CREATED.toString(),
- json, null, ts, null, true);
- connection.commit();
+ List<Mutation> sysTaskUpsertMutations = Task.getMutationsForAddTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(connection)
+ .setTaskType(
+ PTable.TaskType.INDEX_REBUILD)
+ .setTenantId(tenantId)
+ .setSchemaName(schemaName)
+ .setTableName(dataTableName)
+ .setTaskStatus(
+ PTable.TaskStatus.CREATED.toString())
+ .setData(json)
+ .setPriority(null)
+ .setStartTs(ts)
+ .setEndTs(null)
+ .setAccessCheckEnabled(true)
+ .build());
+ byte[] rowKey = sysTaskUpsertMutations
+ .get(0).getRow();
+ MetaDataMutationResult metaDataMutationResult =
+ Task.taskMetaDataCoprocessorExec(connection, rowKey,
+ new TaskMetaDataServiceCallBack(sysTaskUpsertMutations));
+ if (MutationCode.UNABLE_TO_UPSERT_TASK.equals(
+ metaDataMutationResult.getMutationCode())) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPSERT_TASK)
+ .setSchemaName(SYSTEM_SCHEMA_NAME)
+ .setTableName(SYSTEM_TASK_TABLE).build().buildException();
+ }
} catch (IOException e) {
throw new SQLException("Exception happened while adding a System.Task" + e.toString());
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/SystemTaskParams.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/SystemTaskParams.java
new file mode 100644
index 0000000..1740256
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/SystemTaskParams.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.task;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+
+import java.sql.Timestamp;
+
+/**
+ * Task params to be used while upserting records in SYSTEM.TASK table.
+ * This POJO is mainly used while upserting(and committing) or generating
+ * upsert mutations plan in {@link Task} class
+ */
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
+ justification = "endTs and startTs are not used for mutation")
+public class SystemTaskParams {
+
+ private final PhoenixConnection conn;
+ private final PTable.TaskType taskType;
+ private final String tenantId;
+ private final String schemaName;
+ private final String tableName;
+ private final String taskStatus;
+ private final String data;
+ private final Integer priority;
+ private final Timestamp startTs;
+ private final Timestamp endTs;
+ private final boolean accessCheckEnabled;
+
+ public SystemTaskParams(PhoenixConnection conn, PTable.TaskType taskType,
+ String tenantId, String schemaName, String tableName,
+ String taskStatus, String data, Integer priority, Timestamp startTs,
+ Timestamp endTs, boolean accessCheckEnabled) {
+ this.conn = conn;
+ this.taskType = taskType;
+ this.tenantId = tenantId;
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ this.taskStatus = taskStatus;
+ this.data = data;
+ this.priority = priority;
+ this.startTs = startTs;
+ this.endTs = endTs;
+ this.accessCheckEnabled = accessCheckEnabled;
+ }
+
+ public PhoenixConnection getConn() {
+ return conn;
+ }
+
+ public PTable.TaskType getTaskType() {
+ return taskType;
+ }
+
+ public String getTenantId() {
+ return tenantId;
+ }
+
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getTaskStatus() {
+ return taskStatus;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public Integer getPriority() {
+ return priority;
+ }
+
+ public Timestamp getStartTs() {
+ return startTs;
+ }
+
+ public Timestamp getEndTs() {
+ return endTs;
+ }
+
+ public boolean isAccessCheckEnabled() {
+ return accessCheckEnabled;
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
+ justification = "endTs and startTs are not used for mutation")
+ public static class SystemTaskParamsBuilder {
+
+ private PhoenixConnection conn;
+ private PTable.TaskType taskType;
+ private String tenantId;
+ private String schemaName;
+ private String tableName;
+ private String taskStatus;
+ private String data;
+ private Integer priority;
+ private Timestamp startTs;
+ private Timestamp endTs;
+ private boolean accessCheckEnabled;
+
+ public SystemTaskParamsBuilder setConn(PhoenixConnection conn) {
+ this.conn = conn;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setTaskType(PTable.TaskType taskType) {
+ this.taskType = taskType;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setTenantId(String tenantId) {
+ this.tenantId = tenantId;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setSchemaName(String schemaName) {
+ this.schemaName = schemaName;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setTaskStatus(String taskStatus) {
+ this.taskStatus = taskStatus;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setData(String data) {
+ this.data = data;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setPriority(Integer priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setStartTs(Timestamp startTs) {
+ this.startTs = startTs;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setEndTs(Timestamp endTs) {
+ this.endTs = endTs;
+ return this;
+ }
+
+ public SystemTaskParamsBuilder setAccessCheckEnabled(
+ boolean accessCheckEnabled) {
+ this.accessCheckEnabled = accessCheckEnabled;
+ return this;
+ }
+
+ public SystemTaskParams build() {
+ return new SystemTaskParams(conn, taskType, tenantId, schemaName,
+ tableName, taskStatus, data, priority, startTs, endTs,
+ accessCheckEnabled);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
index 0fe256b..e146ce0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -18,14 +18,27 @@
package org.apache.phoenix.schema.task;
import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcUtil;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+ .TaskMetaDataService;
import org.apache.phoenix.coprocessor.tasks.DropChildViewsTask;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +51,9 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
public class Task {
public static final Logger LOGGER = LoggerFactory.getLogger(Task.class);
@@ -75,6 +90,46 @@ public class Task {
}
}
+ private static List<Mutation> getMutationsForSystemTaskTable(
+ final PhoenixConnection conn, final PreparedStatement stmt,
+ final boolean accessCheckEnabled) throws IOException {
+ // we need to mutate SYSTEM.TASK with HBase/login user if access is enabled.
+ if (accessCheckEnabled) {
+ return User.runAsLoginUser(new PrivilegedExceptionAction<List<Mutation>>() {
+ @Override
+ public List<Mutation> run() throws Exception {
+ final RpcServer.Call rpcContext = RpcUtil.getRpcContext();
+ // setting RPC context as null so that user can be reset
+ try {
+ RpcUtil.setRpcContext(null);
+ return executeStatementAndGetTaskMutations(conn, stmt);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ // setting RPC context back to original context of the RPC
+ RpcUtil.setRpcContext(rpcContext);
+ }
+ }
+ });
+ } else {
+ try {
+ return executeStatementAndGetTaskMutations(conn, stmt);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private static List<Mutation> executeStatementAndGetTaskMutations(
+ PhoenixConnection conn, PreparedStatement stmt)
+ throws SQLException {
+ stmt.execute();
+ // retrieve mutations for SYSTEM.TASK upsert query
+ Iterator<Pair<byte[], List<Mutation>>> iterator =
+ conn.getMutationState().toMutations();
+ return iterator.next().getSecond();
+ }
+
private static PreparedStatement setValuesToAddTaskPS(PreparedStatement stmt, PTable.TaskType taskType,
String tenantId, String schemaName, String tableName, String taskStatus, String data,
Integer priority, Timestamp startTs, Timestamp endTs) throws SQLException {
@@ -123,13 +178,28 @@ public class Task {
return stmt;
}
- public static void addTask(PhoenixConnection conn, PTable.TaskType taskType, String tenantId, String schemaName,
- String tableName, String taskStatus, String data, Integer priority, Timestamp startTs, Timestamp endTs,
- boolean accessCheckEnabled)
- throws IOException {
+ /**
+ * Execute and commit upsert query on SYSTEM.TASK
+ * This method should be used only from server side. Client should use
+ * {@link #getMutationsForAddTask(SystemTaskParams)} instead of direct
+ * upsert commit.
+ *
+ * @param systemTaskParams Task params with various task related arguments
+ * @throws IOException If something goes wrong while preparing mutations
+ * or committing transactions
+ */
+ public static void addTask(SystemTaskParams systemTaskParams)
+ throws IOException {
+ addTaskAndGetStatement(systemTaskParams, systemTaskParams.getConn(),
+ true);
+ }
+
+ private static PreparedStatement addTaskAndGetStatement(
+ SystemTaskParams systemTaskParams, PhoenixConnection connection,
+ boolean shouldCommit) throws IOException {
PreparedStatement stmt;
try {
- stmt = conn.prepareStatement("UPSERT INTO " +
+ stmt = connection.prepareStatement("UPSERT INTO " +
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
PhoenixDatabaseMetaData.TASK_TYPE + ", " +
PhoenixDatabaseMetaData.TENANT_ID + ", " +
@@ -141,12 +211,78 @@ public class Task {
PhoenixDatabaseMetaData.TASK_END_TS + ", " +
PhoenixDatabaseMetaData.TASK_DATA +
" ) VALUES(?,?,?,?,?,?,?,?,?)");
- stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName, taskStatus, data, priority, startTs, endTs);
- LOGGER.info("Adding task " + taskType + "," +tableName + "," + taskStatus + "," + startTs, ","+endTs);
+ stmt = setValuesToAddTaskPS(stmt, systemTaskParams.getTaskType(),
+ systemTaskParams.getTenantId(),
+ systemTaskParams.getSchemaName(),
+ systemTaskParams.getTableName(),
+ systemTaskParams.getTaskStatus(), systemTaskParams.getData(),
+ systemTaskParams.getPriority(), systemTaskParams.getStartTs(),
+ systemTaskParams.getEndTs());
+ LOGGER.info("Adding task type: "
+ + systemTaskParams.getTaskType() + " , tableName: "
+ + systemTaskParams.getTableName() + " , taskStatus: "
+ + systemTaskParams.getTaskStatus() + " , startTs: "
+ + systemTaskParams.getStartTs() + " , endTs: "
+ + systemTaskParams.getEndTs());
} catch (SQLException e) {
throw new IOException(e);
}
- mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+ // if query is getting executed by client, do not execute and commit
+ // mutations
+ if (shouldCommit) {
+ mutateSystemTaskTable(connection, stmt,
+ systemTaskParams.isAccessCheckEnabled());
+ }
+ return stmt;
+ }
+
+ public static List<Mutation> getMutationsForAddTask(
+ SystemTaskParams systemTaskParams)
+ throws IOException, SQLException {
+ PhoenixConnection curConn = systemTaskParams.getConn();
+ Configuration conf = curConn.getQueryServices().getConfiguration();
+ // create new connection as we do not want to mix up mutationState
+ // with existing connection
+ try (PhoenixConnection newConnection =
+ QueryUtil.getConnectionOnServer(curConn.getClientInfo(), conf)
+ .unwrap(PhoenixConnection.class)) {
+ PreparedStatement statement = addTaskAndGetStatement(
+ systemTaskParams, newConnection, false);
+ return getMutationsForSystemTaskTable(newConnection,
+ statement, systemTaskParams.isAccessCheckEnabled());
+ }
+ }
+
+ /**
+ * Invoke SYSTEM.TASK metadata coprocessor endpoint
+ *
+ * @param connection Phoenix Connection
+ * @param rowKey key corresponding to SYSTEM.TASK mutation
+ * @param callable used to invoke the coprocessor endpoint to upsert
+ * records in SYSTEM.TASK
+ * @return result of invoking the coprocessor endpoint
+ * @throws SQLException If something goes wrong while executing co
+ */
+ public static MetaDataMutationResult taskMetaDataCoprocessorExec(
+ final PhoenixConnection connection, final byte[] rowKey,
+ final Batch.Call<TaskMetaDataService, MetaDataResponse> callable)
+ throws SQLException {
+ TableName tableName = SchemaUtil.getPhysicalName(
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES,
+ connection.getQueryServices().getProps());
+ try (Table table =
+ connection.getQueryServices().getTable(tableName.getName())) {
+ final Map<byte[], MetaDataResponse> results =
+ table.coprocessorService(TaskMetaDataService.class, rowKey,
+ rowKey, callable);
+ assert results.size() == 1;
+ MetaDataResponse result = results.values().iterator().next();
+ return MetaDataMutationResult.constructFromProto(result);
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ } catch (Throwable t) {
+ throw new SQLException(t);
+ }
}
public static void deleteTask(PhoenixConnection conn, PTable.TaskType taskType, Timestamp ts, String tenantId,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TaskMetaDataServiceCallBack.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TaskMetaDataServiceCallBack.java
new file mode 100644
index 0000000..b7630a8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TaskMetaDataServiceCallBack.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos
+ .TaskMetaDataService;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Callable implementation for coprocessor endpoint associated with
+ * SYSTEM.TASK
+ */
+public class TaskMetaDataServiceCallBack
+ implements Batch.Call<TaskMetaDataService, MetaDataResponse> {
+
+ private final List<Mutation> taskMutations;
+
+ public TaskMetaDataServiceCallBack(List<Mutation> taskMutations) {
+ this.taskMutations = taskMutations;
+ }
+
+ @Override
+ public MetaDataResponse call(TaskMetaDataService instance)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<MetaDataResponse> rpcCallback =
+ new BlockingRpcCallback<>();
+ TaskMetaDataProtos.TaskMutateRequest.Builder builder =
+ TaskMetaDataProtos.TaskMutateRequest.newBuilder();
+ for (Mutation mutation : taskMutations) {
+ ClientProtos.MutationProto mp = ProtobufUtil.toProto(mutation);
+ builder.addTableMetadataMutations(mp.toByteString());
+ }
+ TaskMetaDataProtos.TaskMutateRequest build = builder.build();
+ instance.upsertTaskDetails(controller, build, rpcCallback);
+ if (controller.getFailedOn() != null) {
+ throw controller.getFailedOn();
+ }
+ return rpcCallback.get();
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpointTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpointTest.java
new file mode 100644
index 0000000..ae19bc6
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/TaskMetaDataEndpointTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessor;
+
+
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.TaskMetaDataProtos;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for TaskMetaDataEndpoint
+ */
+public class TaskMetaDataEndpointTest {
+
+ private TaskMetaDataEndpoint taskMetaDataEndpoint;
+ private Configuration configuration;
+
+ @Mock
+ private Region region;
+
+ @Mock
+ private HRegionInfo regionInfo;
+
+ @Mock
+ private RpcController controller;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ configuration = new Configuration();
+ RegionCoprocessorEnvironment environment =
+ new RegionCoprocessorEnvironment() {
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public String getHBaseVersion() {
+ return null;
+ }
+
+ @Override
+ public Coprocessor getInstance() {
+ return null;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public int getLoadSequence() {
+ return 0;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public HTableInterface getTable(
+ TableName tableName) throws IOException {
+ return null;
+ }
+
+ @Override
+ public HTableInterface getTable(TableName tableName,
+ ExecutorService service) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return null;
+ }
+
+ @Override
+ public Region getRegion() {
+ return region;
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return regionInfo;
+ }
+
+ @Override
+ public RegionServerServices getRegionServerServices() {
+ return null;
+ }
+
+ @Override
+ public ConcurrentMap<String, Object> getSharedData() {
+ return null;
+ }
+ };
+ taskMetaDataEndpoint = new TaskMetaDataEndpoint();
+ taskMetaDataEndpoint.start(environment);
+ }
+
+ @Test
+ public void testUpsertTaskDetails() throws Exception {
+ Mutation mutation = new Put(Bytes.toBytes("row1"));
+ TaskMetaDataProtos.TaskMutateRequest.Builder builder =
+ TaskMetaDataProtos.TaskMutateRequest.newBuilder();
+ ClientProtos.MutationProto mp = ProtobufUtil.toProto(mutation);
+ builder.addTableMetadataMutations(mp.toByteString());
+ TaskMetaDataProtos.TaskMutateRequest request = builder.build();
+ BlockingRpcCallback<MetaDataProtos.MetaDataResponse> rpcCallback =
+ new BlockingRpcCallback<>();
+ Mockito.doNothing().when(region).mutateRowsWithLocks(
+ Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(),
+ Mockito.anyLong(), Mockito.anyLong());
+ taskMetaDataEndpoint.upsertTaskDetails(controller, request, rpcCallback);
+ Mockito.verify(region, Mockito.times(1)).mutateRowsWithLocks(
+ Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(),
+ Mockito.anyLong(), Mockito.anyLong());
+ }
+
+ @Test
+ public void testUpsertTaskDetailsFailure() throws Exception {
+ Mutation mutation = new Put(Bytes.toBytes("row2"));
+ TaskMetaDataProtos.TaskMutateRequest.Builder builder =
+ TaskMetaDataProtos.TaskMutateRequest.newBuilder();
+ ClientProtos.MutationProto mp = ProtobufUtil.toProto(mutation);
+ builder.addTableMetadataMutations(mp.toByteString());
+ TaskMetaDataProtos.TaskMutateRequest request = builder.build();
+ BlockingRpcCallback<MetaDataProtos.MetaDataResponse> rpcCallback =
+ new BlockingRpcCallback<>();
+ Mockito.doThrow(new IOException()).when(region).mutateRowsWithLocks(
+ Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(),
+ Mockito.anyLong(), Mockito.anyLong());
+ taskMetaDataEndpoint.upsertTaskDetails(controller, request, rpcCallback);
+ Mockito.verify(region, Mockito.times(1)).mutateRowsWithLocks(
+ Mockito.anyCollectionOf(Mutation.class), Mockito.<Collection<byte[]>>anyObject(),
+ Mockito.anyLong(), Mockito.anyLong());
+ assertEquals(MetaDataProtos.MutationCode.UNABLE_TO_UPSERT_TASK,
+ rpcCallback.get().getReturnCode());
+ }
+
+}
\ No newline at end of file
diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto
index a30fad7..cbc53d6 100644
--- a/phoenix-protocol/src/main/MetaDataService.proto
+++ b/phoenix-protocol/src/main/MetaDataService.proto
@@ -53,6 +53,7 @@ enum MutationCode {
UNABLE_TO_CREATE_CHILD_LINK = 23;
UNABLE_TO_UPDATE_PARENT_TABLE = 24;
UNABLE_TO_DELETE_CHILD_LINK = 25;
+ UNABLE_TO_UPSERT_TASK = 26;
};
message SharedTableState {
diff --git a/phoenix-protocol/src/main/TaskMetaDataService.proto b/phoenix-protocol/src/main/TaskMetaDataService.proto
new file mode 100644
index 0000000..d6868bf
--- /dev/null
+++ b/phoenix-protocol/src/main/TaskMetaDataService.proto
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.phoenix.coprocessor.generated";
+option java_outer_classname = "TaskMetaDataProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "MetaDataService.proto";
+
+message TaskMutateRequest {
+ repeated bytes tableMetadataMutations = 1;
+}
+
+service TaskMetaDataService {
+ rpc upsertTaskDetails(TaskMutateRequest)
+ returns (MetaDataResponse);
+}