You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2021/06/28 03:55:14 UTC
[iotdb] branch master updated: cluster bug - fix privilege check
(#3275)
This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9ae9642 cluster bug - fix privilege check (#3275)
9ae9642 is described below
commit 9ae96421d50790cd265ae8c38334a52c812437e7
Author: Haimei Guo <68...@users.noreply.github.com>
AuthorDate: Mon Jun 28 11:54:54 2021 +0800
cluster bug - fix privilege check (#3275)
* fix authority check
* fix
* add test
* extract conclude result
* add privilege check
---
.../iotdb/cluster/coordinator/Coordinator.java | 85 +++++++++++++++-------
.../apache/iotdb/cluster/metadata/CMManager.java | 33 ++++-----
.../iotdb/cluster/integration/SingleNodeTest.java | 35 +++++++++
.../org/apache/iotdb/db/qp/physical/BatchPlan.java | 11 +++
.../physical/crud/InsertRowsOfOneDevicePlan.java | 7 ++
5 files changed, 125 insertions(+), 46 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 24e746d..b46273a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -69,6 +70,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -227,6 +229,11 @@ public class Coordinator {
return StatusUtils.PARTITION_TABLE_NOT_READY;
}
+ if (!checkPrivilegeForBatchExecution(plan)) {
+ return concludeFinalStatus(
+ plan, plan.getPaths().size(), true, null, false, null, Collections.emptyList());
+ }
+
// split the plan into sub-plans that each only involve one data group
Map<PhysicalPlan, PartitionGroup> planGroupMap;
try {
@@ -243,6 +250,7 @@ public class Coordinator {
|| plan instanceof CreateAlignedTimeSeriesPlan
|| plan instanceof CreateMultiTimeSeriesPlan)
&& ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+
logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan);
try {
((CMManager) IoTDB.metaManager).createSchema(plan);
@@ -260,6 +268,20 @@ public class Coordinator {
}
/**
+ * check if batch execution plan has privilege on any sg
+ *
+ * @param plan
+ * @return
+ */
+ private boolean checkPrivilegeForBatchExecution(PhysicalPlan plan) {
+ if (plan instanceof BatchPlan) {
+ return ((BatchPlan) plan).getResults().size() != plan.getPaths().size();
+ } else {
+ return true;
+ }
+ }
+
+ /**
* Forward a plan to all DataGroupMember groups. Only when all nodes time out, will a TIME_OUT be
* returned. The error messages from each group (if any) will be compacted into one string.
*
@@ -419,25 +441,25 @@ public class Coordinator {
private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
// the error codes from the groups that cannot execute the plan
TSStatus status;
- if (planGroupMap.size() == 1) {
+ // need to create substatus for multiPlan
+
+ // InsertTabletPlan, InsertMultiTabletPlan, InsertRowsPlan and CreateMultiTimeSeriesPlan
+ // contains many rows,
+ // each will correspond to a TSStatus as its execution result,
+ // as the plan is split and the sub-plans may have interleaving ranges,
+ // we must assure that each TSStatus is placed to the right position
+ // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2
+ // belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the
+ // failure and success should be placed into proper positions in TSStatus.subStatus
+ if (plan instanceof InsertTabletPlan
+ || plan instanceof InsertMultiTabletPlan
+ || plan instanceof CreateMultiTimeSeriesPlan
+ || plan instanceof InsertRowsPlan) {
+ status = forwardMultiSubPlan(planGroupMap, plan);
+ } else if (planGroupMap.size() == 1) {
status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
} else {
- if (plan instanceof InsertTabletPlan
- || plan instanceof InsertMultiTabletPlan
- || plan instanceof CreateMultiTimeSeriesPlan
- || plan instanceof InsertRowsPlan) {
- // InsertTabletPlan, InsertMultiTabletPlan, InsertRowsPlan and CreateMultiTimeSeriesPlan
- // contains many rows,
- // each will correspond to a TSStatus as its execution result,
- // as the plan is split and the sub-plans may have interleaving ranges,
- // we must assure that each TSStatus is placed to the right position
- // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2
- // belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the
- // failure and success should be placed into proper positions in TSStatus.subStatus
- status = forwardMultiSubPlan(planGroupMap, plan);
- } else {
- status = forwardToMultipleGroup(planGroupMap);
- }
+ status = forwardToMultipleGroup(planGroupMap);
}
boolean hasCreated = false;
try {
@@ -554,7 +576,7 @@ public class Coordinator {
boolean noFailure = true;
boolean isBatchFailure = false;
EndPoint endPoint = null;
- int totalRowNum = 0;
+ int totalRowNum = parentPlan.getPaths().size();
// send sub-plans to each belonging data group and collect results
for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
tmpStatus = forwardToSingleGroup(entry);
@@ -646,7 +668,24 @@ public class Coordinator {
}
}
}
+ return concludeFinalStatus(
+ parentPlan,
+ totalRowNum,
+ noFailure,
+ endPoint,
+ isBatchFailure,
+ subStatus,
+ errorCodePartitionGroups);
+ }
+ private TSStatus concludeFinalStatus(
+ PhysicalPlan parentPlan,
+ int totalRowNum,
+ boolean noFailure,
+ EndPoint endPoint,
+ boolean isBatchFailure,
+ TSStatus[] subStatus,
+ List<String> errorCodePartitionGroups) {
if (parentPlan instanceof InsertMultiTabletPlan
&& !((InsertMultiTabletPlan) parentPlan).getResults().isEmpty()) {
if (subStatus == null) {
@@ -689,16 +728,6 @@ public class Coordinator {
}
}
- return concludeFinalStatus(
- noFailure, endPoint, isBatchFailure, subStatus, errorCodePartitionGroups);
- }
-
- private TSStatus concludeFinalStatus(
- boolean noFailure,
- EndPoint endPoint,
- boolean isBatchFailure,
- TSStatus[] subStatus,
- List<String> errorCodePartitionGroups) {
TSStatus status;
if (noFailure) {
status = StatusUtils.OK;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 6f74504..e097ac6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -48,11 +48,11 @@ import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
@@ -559,25 +559,11 @@ public class CMManager extends MManager {
// for CreateTimeSeriesPlan, use getPath() to get timeseries to get related storage group,
// for CreateMultiTimeSeriesPlan, use getPaths() to get all timeseries to get related storage
// groups.
- if (plan instanceof InsertRowPlan
- || plan instanceof InsertRowsOfOneDevicePlan
- || plan instanceof InsertTabletPlan) {
+ if (plan instanceof BatchPlan) {
+ storageGroups.addAll(getStorageGroups(getValidStorageGroups(plan)));
+ } else if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
storageGroups.addAll(
getStorageGroups(Collections.singletonList(((InsertPlan) plan).getPrefixPath())));
- } else if (plan instanceof InsertRowsPlan) {
- storageGroups.addAll(
- getStorageGroups(
- ((InsertRowsPlan) plan)
- .getInsertRowPlanList().stream()
- .map(InsertPlan::getPrefixPath)
- .collect(Collectors.toList())));
- } else if (plan instanceof InsertMultiTabletPlan) {
- storageGroups.addAll(
- getStorageGroups(
- ((InsertMultiTabletPlan) plan)
- .getInsertTabletPlanList().stream()
- .map(InsertPlan::getPrefixPath)
- .collect(Collectors.toList())));
} else if (plan instanceof CreateTimeSeriesPlan) {
storageGroups.addAll(
getStorageGroups(Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath())));
@@ -606,6 +592,17 @@ public class CMManager extends MManager {
}
}
+ private List<PartialPath> getValidStorageGroups(PhysicalPlan plan) {
+ List<PartialPath> paths = new ArrayList<>();
+ for (int i = 0; i < plan.getPaths().size(); i++) {
+ // has permission to create sg
+ if (!((BatchPlan) plan).getResults().containsKey(i)) {
+ paths.add(plan.getPaths().get(i));
+ }
+ }
+ return paths;
+ }
+
/** return storage groups paths for given deviceIds or timeseries. */
private List<PartialPath> getStorageGroups(List<PartialPath> paths) throws MetadataException {
Set<PartialPath> storageGroups = new HashSet<>();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
index bc36e47..690dd48 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
@@ -20,17 +20,23 @@
package org.apache.iotdb.cluster.integration;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -93,4 +99,33 @@ public class SingleNodeTest extends BaseSingleNodeTest {
assertFalse(session.checkTimeseriesExists("root.sg1.d1.t2"));
assertFalse(session.checkTimeseriesExists("root.sg1.d1.t3"));
}
+
+ @Test
+ public void testUserPrivilege() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("create user user1 \"1234\"");
+ try (Connection connection1 =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "user1", "1234");
+ Statement userStatement = connection1.createStatement()) {
+ userStatement.addBatch("create timeseries root.sg1.d1.s1 with datatype=int32");
+ userStatement.addBatch("create timeseries root.sg2.d1.s1 with datatype=int32");
+ userStatement.executeBatch();
+ } catch (Exception e) {
+ assertEquals(
+ System.lineSeparator()
+ + "No permissions for this operation CREATE_TIMESERIES for SQL: \"create timeseries root.sg1.d1.s1 with datatype=int32\""
+ + System.lineSeparator()
+ + "No permissions for this operation CREATE_TIMESERIES for SQL: \"create timeseries root.sg2.d1.s1 with datatype=int32\""
+ + System.lineSeparator(),
+ e.getMessage());
+ }
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
index 4da45b8..318b628 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.db.qp.physical;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.util.Map;
+
/** BatchPlan contains multiple sub-plans. */
public interface BatchPlan {
@@ -50,4 +54,11 @@ public interface BatchPlan {
* @return how many sub-plans are in the plan.
*/
int getBatchSize();
+
+ /**
+ * Return execution status for each path
+ *
+ * @return execution status for each path
+ */
+ Map<Integer, TSStatus> getResults();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 0791f39..7fc2e55 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -23,13 +23,16 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.BatchPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
@@ -179,6 +182,10 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
return isExecuted[i];
}
+ public Map<Integer, TSStatus> getResults() {
+ return Collections.emptyMap();
+ }
+
@Override
public int getBatchSize() {
return rowPlans.length;