You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/02/12 08:49:13 UTC
[incubator-iotdb] branch cluster_node_deletion updated: fix
insertion
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_node_deletion by this push:
new ce7fba9 fix insertion
ce7fba9 is described below
commit ce7fba9d259df1f17f3ef543c0ca9b550ec162d5
Author: jt2594838 <jt...@163.com>
AuthorDate: Wed Feb 12 16:48:57 2020 +0800
fix insertion
---
.gitignore | 3 +
.../java/org/apache/iotdb/cluster/ClientMain.java | 42 +++++++------
.../iotdb/cluster/partition/PartitionTable.java | 8 +++
.../cluster/server/member/DataGroupMember.java | 2 +-
.../cluster/server/member/MetaGroupMember.java | 35 ++++++++---
.../iotdb/cluster/server/member/RaftMember.java | 68 +++++++++++-----------
.../cluster/server/member/MetaGroupMemberTest.java | 3 +-
cluster/src/test/resources/logback.xml | 42 +++++++++++++
pom.xml | 6 ++
9 files changed, 147 insertions(+), 62 deletions(-)
diff --git a/.gitignore b/.gitignore
index 70913bb..1fba3e4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -80,3 +80,6 @@ tsfile/src/test/resources/*.ts
local-snapshots-dir/
venv/
+node_identifier
+partitions
+partitions.tmp
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
index b6649cc..6cf5fb1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
@@ -43,8 +43,12 @@ import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClientMain {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClientMain.class);
public static void main(String[] args)
throws TException, InterruptedException, SQLException, IoTDBRPCException {
@@ -84,17 +88,17 @@ public class ClientMain {
private static void executeQuery(Client client, long sessionId, String query, long statementId)
throws TException, SQLException, IoTDBRPCException {
- System.out.println(query);
+ logger.info(query);
TSExecuteStatementResp resp = client
.executeQueryStatement(new TSExecuteStatementReq(sessionId, query, statementId));
long queryId = resp.getQueryId();
- System.out.println(resp.columns);
+ logger.info(resp.columns.toString());
SessionDataSet dataSet = new SessionDataSet(query, resp.getColumns(),
resp.getDataTypeList(), queryId, client, sessionId, resp.queryDataSet);
while (dataSet.hasNext()) {
- System.out.println(dataSet.next());
+ logger.info(dataSet.next().toString());
}
TSCloseOperationReq tsCloseOperationReq = new TSCloseOperationReq(sessionId);
@@ -106,10 +110,10 @@ public class ClientMain {
private static void testInsertion(Client client, long sessionId) throws TException,
InterruptedException {
- System.out.println(client.setStorageGroup(sessionId, "root.beijing"));
- System.out.println(client.setStorageGroup(sessionId, "root.shanghai"));
- System.out.println(client.setStorageGroup(sessionId, "root.guangzhou"));
- System.out.println(client.setStorageGroup(sessionId, "root.shenzhen"));
+ logger.info(client.setStorageGroup(sessionId, "root.beijing").toString());
+ logger.info(client.setStorageGroup(sessionId, "root.shanghai").toString());
+ logger.info(client.setStorageGroup(sessionId, "root.guangzhou").toString());
+ logger.info(client.setStorageGroup(sessionId, "root.shenzhen").toString());
// wait until the storage group creations are committed
Thread.sleep(3000);
@@ -120,13 +124,13 @@ public class ClientMain {
req.setEncoding(TSEncoding.GORILLA.ordinal());
req.setCompressor(CompressionType.SNAPPY.ordinal());
req.setPath("root.beijing.d1.s1");
- System.out.println(client.createTimeseries(req));
+ logger.info(client.createTimeseries(req).toString());
req.setPath("root.shanghai.d1.s1");
- System.out.println(client.createTimeseries(req));
+ logger.info(client.createTimeseries(req).toString());
req.setPath("root.guangzhou.d1.s1");
- System.out.println(client.createTimeseries(req));
+ logger.info(client.createTimeseries(req).toString());
req.setPath("root.shenzhen.d1.s1");
- System.out.println(client.createTimeseries(req));
+ logger.info(client.createTimeseries(req).toString());
// wait until the timeseries creations are committed
Thread.sleep(3000);
@@ -138,17 +142,17 @@ public class ClientMain {
insertReq.setTimestamp(i * 24 * 3600 * 1000L);
insertReq.setValues(Collections.singletonList(Double.toString(i * 0.1)));
insertReq.setDeviceId("root.beijing.d1");
- System.out.println(insertReq);
- System.out.println(client.insert(insertReq));
+ logger.info(insertReq.toString());
+ logger.info(client.insert(insertReq).toString());
insertReq.setDeviceId("root.shanghai.d1");
- System.out.println(insertReq);
- System.out.println(client.insert(insertReq));
+ logger.info(insertReq.toString());
+ logger.info(client.insert(insertReq).toString());
insertReq.setDeviceId("root.guangzhou.d1");
- System.out.println(insertReq);
- System.out.println(client.insert(insertReq));
+ logger.info(insertReq.toString());
+ logger.info(client.insert(insertReq).toString());
insertReq.setDeviceId("root.shenzhen.d1");
- System.out.println(insertReq);
- System.out.println(client.insert(insertReq));
+ logger.info(insertReq.toString());
+ logger.info(client.insert(insertReq).toString());
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 676693a..2f9900f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -211,6 +211,8 @@ public interface PartitionTable {
return splitAndRoutePlan((ShowDevicesPlan) plan);
} else if (plan instanceof CreateTimeSeriesPlan) {
return splitAndRoutePlan((CreateTimeSeriesPlan) plan);
+ } else if (plan instanceof InsertPlan) {
+ return splitAndRoutePlan((InsertPlan) plan);
}
//the if clause can be removed after the program is stable
if (PartitionUtils.isLocalPlan(plan)) {
@@ -224,6 +226,12 @@ public interface PartitionTable {
throw new UnsupportedPlanException(plan);
}
+ default Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertPlan plan)
+ throws StorageGroupNotSetException {
+ PartitionGroup partitionGroup = partitionByPathTime(plan.getDeviceId(), plan.getTime());
+ return Collections.singletonMap(plan, partitionGroup);
+ }
+
default Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(CreateTimeSeriesPlan plan)
throws StorageGroupNotSetException {
PartitionGroup partitionGroup = partitionByPathTime(plan.getPath().getFullPath(), 0);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index cb01ec5..1a0c9e9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -545,7 +545,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
}
}
- return forwardPlan(plan, leader);
+ return forwardPlan(plan, leader, getHeader());
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 523a546..b75c747 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -100,7 +100,6 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.cluster.utils.SerializeUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
-import org.apache.iotdb.cluster.utils.nodetool.function.Partition;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -908,7 +907,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
return status;
}
}
- return forwardPlan(plan, leader);
+ return forwardPlan(plan, leader, null);
}
private TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPlanException {
@@ -931,7 +930,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
if (character != NodeCharacter.LEADER) {
logger
.debug("{}: Cannot found partition groups for {}, forwarding to {}", name, plan, leader);
- return forwardPlan(plan, leader);
+ return forwardPlan(plan, leader, null);
} else {
logger.debug("{}: Cannot found storage groups for {}", name, plan);
return StatusUtils.NO_STORAGE_GROUP;
@@ -941,7 +940,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
TSStatus status;
List<Entry<PhysicalPlan, PartitionGroup>> succeededEntries = new ArrayList<>();
- List<Integer> errorCodes = new ArrayList<>();
+ List<String> errorCodePartitionGroups = new ArrayList<>();
for(Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
TSStatus subStatus;
if (entry.getValue().contains(thisNode)) {
@@ -953,22 +952,44 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
subStatus = forwardPlan(entry.getKey(), entry.getValue());
}
if (subStatus.getStatusType().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- errorCodes.add(subStatus.getStatusType().getCode());
+ errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+ subStatus.getStatusType().getCode(), entry.getValue().getHeader(),
+ subStatus.getStatusType().getMessage()));
} else {
succeededEntries.add(entry);
}
}
- if (errorCodes.isEmpty()) {
+ if (errorCodePartitionGroups.isEmpty()) {
status = StatusUtils.OK;
} else {
status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
status.getStatusType().setMessage("The following errors occurred when executing the query, "
- + "please retry or contact the DBA: " + errorCodes.toString());
+ + "please retry or contact the DBA: " + errorCodePartitionGroups.toString());
//TODO-Cluster: abort the succeeded ones if necessary.
}
return status;
}
+ TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
+ // if a plan is partitioned to any group, it must be processed by its data server instead of
+ // meta server
+ for (Node node : group) {
+ TSStatus status;
+ try {
+ status = forwardPlan(plan, getDataClientPool().getClient(node), node, group.getHeader());
+ } catch (IOException e) {
+ status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+ status.getStatusType().setMessage(e.getMessage());
+ }
+ if (status != StatusUtils.TIME_OUT) {
+ return status;
+ }
+ }
+ return StatusUtils.TIME_OUT;
+ }
+
+
+
/**
* Pull the all timeseries schemas of a given prefixPath from a remote node.
*/
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index ea021b2..214ae19 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -600,16 +600,6 @@ public abstract class RaftMember implements RaftService.AsyncIface {
return name;
}
- TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
- for (Node node : group) {
- TSStatus status = forwardPlan(plan, node);
- if (status != StatusUtils.TIME_OUT) {
- return status;
- }
- }
- return StatusUtils.TIME_OUT;
- }
-
/**
*
* @return the header of the data raft group or null if this is in a meta group.
@@ -618,7 +608,15 @@ public abstract class RaftMember implements RaftService.AsyncIface {
return null;
}
- TSStatus forwardPlan(PhysicalPlan plan, Node node) {
+ /**
+ * Forward a plan to a node using the default client.
+ * @param plan
+ * @param node
+ * @param header must be set for data group communication, set to null for meta group
+ * communication
+ * @return
+ */
+ TSStatus forwardPlan(PhysicalPlan plan, Node node, Node header) {
if (node == thisNode || node == null) {
logger.debug("{}: plan {} has no where to be forwarded", name, plan);
return StatusUtils.NO_LEADER;
@@ -628,32 +626,36 @@ public abstract class RaftMember implements RaftService.AsyncIface {
AsyncClient client = connectNode(node);
if (client != null) {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
- try {
- plan.serializeTo(dataOutputStream);
- AtomicReference<TSStatus> status = new AtomicReference<>();
- ExecutNonQueryReq req = new ExecutNonQueryReq();
- req.setPlanBytes(byteArrayOutputStream.toByteArray());
- if (getHeader() != null) {
- req.setHeader(getHeader());
- }
- synchronized (status) {
- client.executeNonQueryPlan(req, new ForwardPlanHandler(status, plan, node));
- status.wait(RaftServer.connectionTimeoutInMS);
- }
- return status.get() == null ? StatusUtils.TIME_OUT : status.get();
- } catch (IOException | TException e) {
- TSStatus status = StatusUtils.INTERNAL_ERROR.deepCopy();
- status.getStatusType().setMessage(e.getMessage());
- return status;
- } catch (InterruptedException e) {
- return StatusUtils.TIME_OUT;
- }
+ return forwardPlan(plan, client, node, header);
}
return StatusUtils.TIME_OUT;
}
+ TSStatus forwardPlan(PhysicalPlan plan, AsyncClient client, Node receiver, Node header) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ try {
+ plan.serializeTo(dataOutputStream);
+ AtomicReference<TSStatus> status = new AtomicReference<>();
+ ExecutNonQueryReq req = new ExecutNonQueryReq();
+ req.setPlanBytes(byteArrayOutputStream.toByteArray());
+ if (header != null) {
+ req.setHeader(header);
+ }
+ synchronized (status) {
+ client.executeNonQueryPlan(req, new ForwardPlanHandler(status, plan, receiver));
+ status.wait(RaftServer.connectionTimeoutInMS);
+ }
+ return status.get() == null ? StatusUtils.TIME_OUT : status.get();
+ } catch (IOException | TException e) {
+ TSStatus status = StatusUtils.INTERNAL_ERROR.deepCopy();
+ status.getStatusType().setMessage(e.getMessage());
+ return status;
+ } catch (InterruptedException e) {
+ return StatusUtils.TIME_OUT;
+ }
+ }
+
TSStatus processPlanLocally(PhysicalPlan plan) {
logger.debug("{}: Processing plan {}", name, plan);
synchronized (logManager) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 767ea81..fe03605 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -166,11 +166,10 @@ public class MetaGroupMemberTest extends MemberTest {
}
@Override
- TSStatus forwardPlan(PhysicalPlan plan, Node node) {
+ TSStatus forwardPlan(PhysicalPlan plan, Node node, Node header) {
return executeNonQuery(plan);
}
- @Override
TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
return executeNonQuery(plan);
}
diff --git a/cluster/src/test/resources/logback.xml b/cluster/src/test/resources/logback.xml
new file mode 100644
index 0000000..d372192
--- /dev/null
+++ b/cluster/src/test/resources/logback.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration debug="false">
+ <property name="LOG_PATH" value="target/logs"/>
+ <!-- prevent logback from outputting its own status at the start of every log -->
+ <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
+ <appender class="ch.qos.logback.core.ConsoleAppender" name="stdout">
+ <Target>System.out</Target>
+ <encoder>
+ <pattern>%-5p [%d] [%thread] %C:%L - %m %n</pattern>
+ <charset>utf-8</charset>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
+ <!--<logger name="org.apache.iotdb.db.utils.OpenFileNumUtil" level="debug" />-->
+ <!--<logger name="org.apache.iotdb.db.utils.OpenFileNumUtilTest" level="debug" />-->
+ <logger name="org.apache.iotdb.db.integration.IoTDBMergeTest" level="INFO"/>
+ <root level="ERROR">
+ <appender-ref ref="stdout"/>
+ </root>
+</configuration>
diff --git a/pom.xml b/pom.xml
index bf32513..f775db6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1187,4 +1187,10 @@
</modules>
</profile>
</profiles>
+ <repositories>
+ <repository>
+ <id>spring-libs</id>
+ <url>http://repo.spring.io/libs-milestone/</url>
+ </repository>
+ </repositories>
</project>