You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/02/12 08:49:13 UTC

[incubator-iotdb] branch cluster_node_deletion updated: fix insertion

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_node_deletion by this push:
     new ce7fba9  fix insertion
ce7fba9 is described below

commit ce7fba9d259df1f17f3ef543c0ca9b550ec162d5
Author: jt2594838 <jt...@163.com>
AuthorDate: Wed Feb 12 16:48:57 2020 +0800

    fix insertion
---
 .gitignore                                         |  3 +
 .../java/org/apache/iotdb/cluster/ClientMain.java  | 42 +++++++------
 .../iotdb/cluster/partition/PartitionTable.java    |  8 +++
 .../cluster/server/member/DataGroupMember.java     |  2 +-
 .../cluster/server/member/MetaGroupMember.java     | 35 ++++++++---
 .../iotdb/cluster/server/member/RaftMember.java    | 68 +++++++++++-----------
 .../cluster/server/member/MetaGroupMemberTest.java |  3 +-
 cluster/src/test/resources/logback.xml             | 42 +++++++++++++
 pom.xml                                            |  6 ++
 9 files changed, 147 insertions(+), 62 deletions(-)

diff --git a/.gitignore b/.gitignore
index 70913bb..1fba3e4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -80,3 +80,6 @@ tsfile/src/test/resources/*.ts
 local-snapshots-dir/
 venv/
 
+node_identifier
+partitions
+partitions.tmp
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
index b6649cc..6cf5fb1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
@@ -43,8 +43,12 @@ import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClientMain {
+  
+  private static final Logger logger = LoggerFactory.getLogger(ClientMain.class);
 
   public static void main(String[] args)
       throws TException, InterruptedException, SQLException, IoTDBRPCException {
@@ -84,17 +88,17 @@ public class ClientMain {
 
   private static void executeQuery(Client client, long sessionId, String query, long statementId)
       throws TException, SQLException, IoTDBRPCException {
-    System.out.println(query);
+    logger.info(query);
     TSExecuteStatementResp resp = client
         .executeQueryStatement(new TSExecuteStatementReq(sessionId, query, statementId));
     long queryId = resp.getQueryId();
-    System.out.println(resp.columns);
+    logger.info(resp.columns.toString());
 
     SessionDataSet dataSet = new SessionDataSet(query, resp.getColumns(),
         resp.getDataTypeList(), queryId, client, sessionId, resp.queryDataSet);
 
     while (dataSet.hasNext()) {
-      System.out.println(dataSet.next());
+      logger.info(dataSet.next().toString());
     }
 
     TSCloseOperationReq tsCloseOperationReq = new TSCloseOperationReq(sessionId);
@@ -106,10 +110,10 @@ public class ClientMain {
 
   private static void testInsertion(Client client, long sessionId) throws TException,
       InterruptedException {
-    System.out.println(client.setStorageGroup(sessionId, "root.beijing"));
-    System.out.println(client.setStorageGroup(sessionId, "root.shanghai"));
-    System.out.println(client.setStorageGroup(sessionId, "root.guangzhou"));
-    System.out.println(client.setStorageGroup(sessionId, "root.shenzhen"));
+    logger.info(client.setStorageGroup(sessionId, "root.beijing").toString());
+    logger.info(client.setStorageGroup(sessionId, "root.shanghai").toString());
+    logger.info(client.setStorageGroup(sessionId, "root.guangzhou").toString());
+    logger.info(client.setStorageGroup(sessionId, "root.shenzhen").toString());
 
     // wait until the storage group creations are committed
     Thread.sleep(3000);
@@ -120,13 +124,13 @@ public class ClientMain {
     req.setEncoding(TSEncoding.GORILLA.ordinal());
     req.setCompressor(CompressionType.SNAPPY.ordinal());
     req.setPath("root.beijing.d1.s1");
-    System.out.println(client.createTimeseries(req));
+    logger.info(client.createTimeseries(req).toString());
     req.setPath("root.shanghai.d1.s1");
-    System.out.println(client.createTimeseries(req));
+    logger.info(client.createTimeseries(req).toString());
     req.setPath("root.guangzhou.d1.s1");
-    System.out.println(client.createTimeseries(req));
+    logger.info(client.createTimeseries(req).toString());
     req.setPath("root.shenzhen.d1.s1");
-    System.out.println(client.createTimeseries(req));
+    logger.info(client.createTimeseries(req).toString());
 
     // wait until the timeseries creations are committed
     Thread.sleep(3000);
@@ -138,17 +142,17 @@ public class ClientMain {
       insertReq.setTimestamp(i * 24 * 3600 * 1000L);
       insertReq.setValues(Collections.singletonList(Double.toString(i * 0.1)));
       insertReq.setDeviceId("root.beijing.d1");
-      System.out.println(insertReq);
-      System.out.println(client.insert(insertReq));
+      logger.info(insertReq.toString());
+      logger.info(client.insert(insertReq).toString());
       insertReq.setDeviceId("root.shanghai.d1");
-      System.out.println(insertReq);
-      System.out.println(client.insert(insertReq));
+      logger.info(insertReq.toString());
+      logger.info(client.insert(insertReq).toString());
       insertReq.setDeviceId("root.guangzhou.d1");
-      System.out.println(insertReq);
-      System.out.println(client.insert(insertReq));
+      logger.info(insertReq.toString());
+      logger.info(client.insert(insertReq).toString());
       insertReq.setDeviceId("root.shenzhen.d1");
-      System.out.println(insertReq);
-      System.out.println(client.insert(insertReq));
+      logger.info(insertReq.toString());
+      logger.info(client.insert(insertReq).toString());
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 676693a..2f9900f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -211,6 +211,8 @@ public interface PartitionTable {
       return splitAndRoutePlan((ShowDevicesPlan) plan);
     } else if (plan instanceof CreateTimeSeriesPlan) {
       return splitAndRoutePlan((CreateTimeSeriesPlan) plan);
+    } else if (plan instanceof InsertPlan) {
+      return splitAndRoutePlan((InsertPlan) plan);
     }
     //the if clause can be removed after the program is stable
     if (PartitionUtils.isLocalPlan(plan)) {
@@ -224,6 +226,12 @@ public interface PartitionTable {
     throw new UnsupportedPlanException(plan);
   }
 
+  default Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertPlan plan)
+      throws StorageGroupNotSetException {
+    PartitionGroup partitionGroup = partitionByPathTime(plan.getDeviceId(), plan.getTime());
+    return Collections.singletonMap(plan, partitionGroup);
+  }
+
   default Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(CreateTimeSeriesPlan plan)
       throws StorageGroupNotSetException {
     PartitionGroup partitionGroup = partitionByPathTime(plan.getPath().getFullPath(), 0);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index cb01ec5..1a0c9e9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -545,7 +545,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       }
     }
 
-    return forwardPlan(plan, leader);
+    return forwardPlan(plan, leader, getHeader());
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 523a546..b75c747 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -100,7 +100,6 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.cluster.utils.SerializeUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
-import org.apache.iotdb.cluster.utils.nodetool.function.Partition;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -908,7 +907,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
         return status;
       }
     }
-    return forwardPlan(plan, leader);
+    return forwardPlan(plan, leader, null);
   }
 
   private TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPlanException {
@@ -931,7 +930,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
       if (character != NodeCharacter.LEADER) {
         logger
             .debug("{}: Cannot found partition groups for {}, forwarding to {}", name, plan, leader);
-        return forwardPlan(plan, leader);
+        return forwardPlan(plan, leader, null);
       } else {
         logger.debug("{}: Cannot found storage groups for {}", name, plan);
         return StatusUtils.NO_STORAGE_GROUP;
@@ -941,7 +940,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
 
     TSStatus status;
     List<Entry<PhysicalPlan, PartitionGroup>> succeededEntries = new ArrayList<>();
-    List<Integer> errorCodes = new ArrayList<>();
+    List<String> errorCodePartitionGroups = new ArrayList<>();
     for(Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
       TSStatus subStatus;
       if (entry.getValue().contains(thisNode)) {
@@ -953,22 +952,44 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
         subStatus = forwardPlan(entry.getKey(), entry.getValue());
       }
       if (subStatus.getStatusType().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        errorCodes.add(subStatus.getStatusType().getCode());
+        errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+            subStatus.getStatusType().getCode(), entry.getValue().getHeader(),
+            subStatus.getStatusType().getMessage()));
       } else {
         succeededEntries.add(entry);
       }
     }
-    if (errorCodes.isEmpty()) {
+    if (errorCodePartitionGroups.isEmpty()) {
       status = StatusUtils.OK;
     } else {
       status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
       status.getStatusType().setMessage("The following errors occurred when executing the query, "
-          + "please retry or contact the DBA: " + errorCodes.toString());
+          + "please retry or contact the DBA: " + errorCodePartitionGroups.toString());
       //TODO-Cluster: abort the succeeded ones if necessary.
     }
     return status;
   }
 
+  TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
+    // if a plan is partitioned to any group, it must be processed by its data server instead of
+    // meta server
+    for (Node node : group) {
+      TSStatus status;
+      try {
+        status = forwardPlan(plan, getDataClientPool().getClient(node), node, group.getHeader());
+      } catch (IOException e) {
+        status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+        status.getStatusType().setMessage(e.getMessage());
+      }
+      if (status != StatusUtils.TIME_OUT) {
+        return status;
+      }
+    }
+    return StatusUtils.TIME_OUT;
+  }
+
+
+
   /**
    * Pull the all timeseries schemas of a given prefixPath from a remote node.
    */
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index ea021b2..214ae19 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -600,16 +600,6 @@ public abstract class RaftMember implements RaftService.AsyncIface {
     return name;
   }
 
-  TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
-    for (Node node : group) {
-      TSStatus status = forwardPlan(plan, node);
-      if (status != StatusUtils.TIME_OUT) {
-        return status;
-      }
-    }
-    return StatusUtils.TIME_OUT;
-  }
-
   /**
    *
    * @return the header of the data raft group or null if this is in a meta group.
@@ -618,7 +608,15 @@ public abstract class RaftMember implements RaftService.AsyncIface {
     return null;
   }
 
-  TSStatus forwardPlan(PhysicalPlan plan, Node node) {
+  /**
+   * Forward a plan to a node using the default client.
+   * @param plan
+   * @param node
+   * @param header must be set for data group communication, set to null for meta group
+   *               communication
+   * @return
+   */
+  TSStatus forwardPlan(PhysicalPlan plan, Node node, Node header) {
     if (node == thisNode || node == null) {
       logger.debug("{}: plan {} has no where to be forwarded", name, plan);
       return StatusUtils.NO_LEADER;
@@ -628,32 +626,36 @@ public abstract class RaftMember implements RaftService.AsyncIface {
 
     AsyncClient client = connectNode(node);
     if (client != null) {
-      ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-      DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
-      try {
-        plan.serializeTo(dataOutputStream);
-        AtomicReference<TSStatus> status = new AtomicReference<>();
-        ExecutNonQueryReq req = new ExecutNonQueryReq();
-        req.setPlanBytes(byteArrayOutputStream.toByteArray());
-        if (getHeader() != null) {
-          req.setHeader(getHeader());
-        }
-        synchronized (status) {
-          client.executeNonQueryPlan(req, new ForwardPlanHandler(status, plan, node));
-          status.wait(RaftServer.connectionTimeoutInMS);
-        }
-        return status.get() == null ? StatusUtils.TIME_OUT : status.get();
-      } catch (IOException | TException e) {
-        TSStatus status = StatusUtils.INTERNAL_ERROR.deepCopy();
-        status.getStatusType().setMessage(e.getMessage());
-        return status;
-      } catch (InterruptedException e) {
-        return StatusUtils.TIME_OUT;
-      }
+     return forwardPlan(plan, client, node, header);
     }
     return StatusUtils.TIME_OUT;
   }
 
+  TSStatus forwardPlan(PhysicalPlan plan, AsyncClient client, Node receiver, Node header) {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+    try {
+      plan.serializeTo(dataOutputStream);
+      AtomicReference<TSStatus> status = new AtomicReference<>();
+      ExecutNonQueryReq req = new ExecutNonQueryReq();
+      req.setPlanBytes(byteArrayOutputStream.toByteArray());
+      if (header != null) {
+        req.setHeader(header);
+      }
+      synchronized (status) {
+        client.executeNonQueryPlan(req, new ForwardPlanHandler(status, plan, receiver));
+        status.wait(RaftServer.connectionTimeoutInMS);
+      }
+      return status.get() == null ? StatusUtils.TIME_OUT : status.get();
+    } catch (IOException | TException e) {
+      TSStatus status = StatusUtils.INTERNAL_ERROR.deepCopy();
+      status.getStatusType().setMessage(e.getMessage());
+      return status;
+    } catch (InterruptedException e) {
+      return StatusUtils.TIME_OUT;
+    }
+  }
+
   TSStatus processPlanLocally(PhysicalPlan plan) {
     logger.debug("{}: Processing plan {}", name, plan);
     synchronized (logManager) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 767ea81..fe03605 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -166,11 +166,10 @@ public class MetaGroupMemberTest extends MemberTest {
       }
 
       @Override
-      TSStatus forwardPlan(PhysicalPlan plan, Node node) {
+      TSStatus forwardPlan(PhysicalPlan plan, Node node, Node header) {
         return executeNonQuery(plan);
       }
 
-      @Override
       TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
         return executeNonQuery(plan);
       }
diff --git a/cluster/src/test/resources/logback.xml b/cluster/src/test/resources/logback.xml
new file mode 100644
index 0000000..d372192
--- /dev/null
+++ b/cluster/src/test/resources/logback.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration debug="false">
+    <property name="LOG_PATH" value="target/logs"/>
+    <!-- prevent logback from outputting its own status at the start of every log -->
+    <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
+    <appender class="ch.qos.logback.core.ConsoleAppender" name="stdout">
+        <Target>System.out</Target>
+        <encoder>
+            <pattern>%-5p [%d] [%thread] %C:%L - %m %n</pattern>
+            <charset>utf-8</charset>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
+    <!--<logger name="org.apache.iotdb.db.utils.OpenFileNumUtil" level="debug" />-->
+    <!--<logger name="org.apache.iotdb.db.utils.OpenFileNumUtilTest" level="debug" />-->
+    <logger name="org.apache.iotdb.db.integration.IoTDBMergeTest" level="INFO"/>
+    <root level="ERROR">
+        <appender-ref ref="stdout"/>
+    </root>
+</configuration>
diff --git a/pom.xml b/pom.xml
index bf32513..f775db6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1187,4 +1187,10 @@
             </modules>
         </profile>
     </profiles>
+    <repositories>
+        <repository>
+            <id>spring-libs</id>
+            <url>http://repo.spring.io/libs-milestone/</url>
+        </repository>
+    </repositories>
 </project>