You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/02/13 09:46:56 UTC

[incubator-iotdb] branch cluster_node_deletion updated: add time intervals

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_node_deletion by this push:
     new 970f582  add time intervals
970f582 is described below

commit 970f5824479050b840455f40a72e3715dcc0b79c
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Feb 13 17:46:38 2020 +0800

    add time intervals
---
 .../handlers/caller/AppendNodeEntryHandler.java    |   1 +
 .../iotdb/cluster/server/member/RaftMember.java    |  70 +++----
 .../apache/iotdb/cluster/utils/PartitionUtils.java | 225 ++++++++++++++++++++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   4 +-
 .../iotdb/tsfile/read/filter/GroupByFilter.java    |   8 +
 .../iotdb/tsfile/read/filter/operator/In.java      |   4 +
 6 files changed, 268 insertions(+), 44 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index dad3493..db81efd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -48,6 +48,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
 
   @Override
   public void onComplete(Long response) {
+    logger.debug("Append response {} from {}", response, receiver);
     if (leaderShipStale.get()) {
       // someone has rejected this log because the leadership is stale
       return;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 214ae19..335a721 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -76,6 +76,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
 
   ClusterConfig config = ClusterDescriptor.getINSTANCE().getConfig();
   private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
+  private static final int SEND_LOG_RETRY = 3;
 
   String name;
 
@@ -198,7 +199,6 @@ public abstract class RaftMember implements RaftService.AsyncIface {
         return;
       }
 
-      long thisTerm = term.get();
       if (character != NodeCharacter.ELECTOR) {
         // only elector votes
         resultHandler.onComplete(Response.RESPONSE_LEADER_STILL_ONLINE);
@@ -295,6 +295,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
     try {
       Log log = LogParser.getINSTANCE().parse(request.entry);
       resultHandler.onComplete(appendEntry(log));
+      logger.debug("{} AppendEntryRequest completed", name);
     } catch (UnknownLogTypeException e) {
       resultHandler.onError(e);
     }
@@ -322,7 +323,6 @@ public abstract class RaftMember implements RaftService.AsyncIface {
   }
 
   // synchronized: logs are serialized
-  //TODO why synchronized?
   private synchronized AppendLogResult sendLogToFollowers(Log log, AtomicInteger quorum) {
     if (allNodes.size() == 1) {
       // single node group, does not need the agreement of others
@@ -342,12 +342,9 @@ public abstract class RaftMember implements RaftService.AsyncIface {
     }
 
     synchronized (quorum) {//this synchronized codes are just for calling quorum.wait.
-      //TODO As we have used synchronized (), do we really need to use AtomicInteger?
 
       // synchronized: avoid concurrent modification
       synchronized (allNodes) {
-        //TODO allNodes.sync is only used here. Is that needed?
-        //By the way, readLock is ok for this case.
         for (Node node : allNodes) {
           AsyncClient client = connectNode(node);
           if (client != null) {
@@ -359,6 +356,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
             handler.setReceiverTerm(newLeaderTerm);
             try {
               client.appendEntry(request, handler);
+              logger.debug("{} sending a log to {}: {}", name, node, log);
             } catch (Exception e) {
               logger.warn("{} cannot append log to node {}", name, node, e);
             }
@@ -408,12 +406,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
     allNodes.add(thisNode);
   }
 
-  public void setLastCatchUpResponseTime(
-      Map<Node, Long> lastCatchUpResponseTime) {
-    this.lastCatchUpResponseTime = lastCatchUpResponseTime;
-  }
-
-  public void /**/setCharacter(NodeCharacter character) {
+  public void setCharacter(NodeCharacter character) {
     logger.info("{} has become a {}", name, character);
     this.character = character;
   }
@@ -672,29 +665,34 @@ public abstract class RaftMember implements RaftService.AsyncIface {
       log.setPlan(plan);
       logManager.appendLog(log);
 
-      logger.debug("{}: Send plan {} to other nodes", name, plan);
-      AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
-
-      switch (result) {
-        case OK:
-          logger.debug("{}: Plan {} is accepted", name, plan);
-          try {
-            logManager.commitLog(log);
-          } catch (QueryProcessException e) {
-            logger.info("{}: The log {} is not successfully applied, reverting", name, log, e);
+      retry:
+      for (int i = SEND_LOG_RETRY; i >= 0; i--) {
+        logger.debug("{}: Send plan {} to other nodes, retry remaining {}", name, plan, i);
+        AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
+        switch (result) {
+          case OK:
+            logger.debug("{}: Plan {} is accepted", name, plan);
+            try {
+              logManager.commitLog(log);
+            } catch (QueryProcessException e) {
+              logger.info("{}: The log {} is not successfully applied, reverting", name, log, e);
+              logManager.removeLastLog();
+              TSStatus status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+              status.getStatusType().setMessage(e.getMessage());
+              return status;
+            }
+            return StatusUtils.OK;
+          case TIME_OUT:
+            logger.debug("{}: Plan {} timed out", name, plan);
+            if (i == 1) {
+              return StatusUtils.TIME_OUT;
+            }
+            break;
+          case LEADERSHIP_STALE:
+          default:
             logManager.removeLastLog();
-            TSStatus status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
-            status.getStatusType().setMessage(e.getMessage());
-            return status;
-          }
-          return StatusUtils.OK;
-        case TIME_OUT:
-          logger.debug("{}: Plan {} timed out", name, plan);
-          logManager.removeLastLog();
-          return StatusUtils.TIME_OUT;
-        case LEADERSHIP_STALE:
-        default:
-          logManager.removeLastLog();
+            break retry;
+        }
       }
     }
     return null;
@@ -798,11 +796,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
       byte[] bytes = new byte[length];
       ByteBuffer result = ByteBuffer.wrap(bytes);
       int len = bufferedInputStream.read(bytes);
-      if (len > 0) {
-        result.limit(len);
-      } else {
-        result.limit(0);
-      }
+      result.limit(Math.max(len, 0));
 
       resultHandler.onComplete(result);
     } catch (IOException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 6bf8b89..c700931 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -21,16 +21,14 @@ package org.apache.iotdb.cluster.utils;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.HASH_SALT;
 
+import java.util.ArrayList;
 import org.apache.iotdb.cluster.config.ClusterConstant;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
@@ -41,6 +39,18 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
 import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeEq;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeIn;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLt;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLtEq;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeNotEq;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.NotFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
 import org.apache.iotdb.tsfile.utils.Murmur128Hash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,5 +124,214 @@ public class PartitionUtils {
     return newPlan;
   }
 
+  public static TimeIntervals extractTimeInterval(Filter filter) {
+    // and, or, not, value, time, group by
+    // eq, neq, gt, gteq, lt, lteq, in
+    if (filter instanceof AndFilter) {
+      AndFilter andFilter = ((AndFilter) filter);
+      TimeIntervals leftIntervals = extractTimeInterval(andFilter.getLeft());
+      TimeIntervals rightIntervals = extractTimeInterval(andFilter.getRight());
+      return leftIntervals.intersection(rightIntervals);
+    } else if (filter instanceof OrFilter) {
+      OrFilter orFilter = ((OrFilter) filter);
+      TimeIntervals leftIntervals = extractTimeInterval(orFilter.getLeft());
+      TimeIntervals rightIntervals = extractTimeInterval(orFilter.getRight());
+      return leftIntervals.union(rightIntervals);
+    } else if (filter instanceof NotFilter) {
+      NotFilter notFilter = ((NotFilter) filter);
+      return extractTimeInterval(notFilter.getFilter()).not();
+    } else if (filter instanceof TimeGt) {
+      TimeGt timeGt = ((TimeGt) filter);
+      return new TimeIntervals(((long) timeGt.getValue()) + 1, Long.MAX_VALUE);
+    } else if (filter instanceof TimeGtEq) {
+      TimeGtEq timeGtEq = ((TimeGtEq) filter);
+      return new TimeIntervals(((long) timeGtEq.getValue()), Long.MAX_VALUE);
+    } else if (filter instanceof TimeEq) {
+      TimeEq timeEq = ((TimeEq) filter);
+      return new TimeIntervals(((long) timeEq.getValue()), ((long) timeEq.getValue()));
+    } else if (filter instanceof TimeNotEq) {
+      TimeNotEq timeNotEq = ((TimeNotEq) filter);
+      TimeIntervals intervals = new TimeIntervals();
+      intervals.addInterval(Long.MIN_VALUE, (long) timeNotEq.getValue() - 1);
+      intervals.addInterval((long) timeNotEq.getValue() + 1, Long.MAX_VALUE);
+      return intervals;
+    } else if (filter instanceof TimeLt) {
+      TimeLt timeLt = ((TimeLt) filter);
+      return new TimeIntervals(Long.MIN_VALUE, (long) timeLt.getValue() - 1);
+    } else if (filter instanceof TimeLtEq) {
+      TimeLtEq timeLtEq = ((TimeLtEq) filter);
+      return new TimeIntervals(Long.MIN_VALUE, (long) timeLtEq.getValue());
+    } else if (filter instanceof TimeIn) {
+      TimeIn timeIn = ((TimeIn) filter);
+      TimeIntervals intervals = new TimeIntervals();
+      for (Object value : timeIn.getValues()) {
+        long time = ((long) value);
+        intervals.addInterval(time, time);
+      }
+      return intervals;
+    } else if (filter instanceof GroupByFilter) {
+      GroupByFilter groupByFilter = ((GroupByFilter) filter);
+      return new TimeIntervals(groupByFilter.getStartTime(), groupByFilter.getEndTime() + 1);
+    }
+    logger.warn("Unrecognized filter class: {}", filter.getClass());
+    return TimeIntervals.ALL_INTERVAL;
+  }
+
+  public static class TimeIntervals extends ArrayList<Long> {
+
+    public static final TimeIntervals ALL_INTERVAL = new TimeIntervals(Long.MIN_VALUE,
+        Long.MAX_VALUE);
+
+    public TimeIntervals() {
+      super();
+    }
+
+    public TimeIntervals(long lowerBound, long upperBound) {
+      super();
+      addInterval(lowerBound, upperBound);
+    }
+
+    public int getIntervalSize() {
+      return size() / 2;
+    }
+
+    public long getLowerBound(int index) {
+      return get(index * 2);
+    }
+
+    public long getUpperBound(int index) {
+      return get(index * 2 + 1);
+    }
 
+    public void addInterval(long lowerBound, long upperBound) {
+      add(lowerBound);
+      add(upperBound);
+    }
+
+    public TimeIntervals intersection(TimeIntervals that) {
+      TimeIntervals result = new TimeIntervals();
+      int thisSize = this.getIntervalSize();
+      int thatSize = that.getIntervalSize();
+      for (int i = 0; i < thisSize; i++) {
+        for (int j = 0; j < thatSize; j++) {
+          long thisLB = this.getLowerBound(i);
+          long thisUB = this.getUpperBound(i);
+          long thatLB = that.getLowerBound(i);
+          long thatUB = that.getUpperBound(i);
+          if (thisUB >= thatLB) {
+            if (thisUB <= thatUB) {
+              result.addInterval(Math.max(thisLB, thatLB), thisUB);
+            } else if (thisLB <= thatUB) {
+              result.addInterval(Math.max(thisLB, thatLB), thatUB);
+            }
+          }
+        }
+      }
+      return result;
+    }
+
+    /**
+     * The union is implemented by merge, so the two intervals must be ordered.
+     * @param that
+     * @return
+     */
+    public TimeIntervals union(TimeIntervals that) {
+      TimeIntervals result = new TimeIntervals();
+      if (this.isEmpty()) {
+        return that;
+      } else if (that.isEmpty()) {
+        return this;
+      }
+
+      int thisSize = this.getIntervalSize();
+      int thatSize = that.getIntervalSize();
+      int thisIndex = 0;
+      int thatIndex = 0;
+      long lastLowerBound = 0;
+      long lastUpperBound = 0;
+      boolean lastBoundSet = false;
+      // merge the heads of the two intervals
+      while (thisIndex < thisSize && thatIndex < thatSize) {
+        long thisLB = this.getLowerBound(thisIndex);
+        long thisUB = this.getUpperBound(thisIndex);
+        long thatLB = that.getLowerBound(thatIndex);
+        long thatUB = that.getUpperBound(thatIndex);
+        if (!lastBoundSet) {
+          lastBoundSet = true;
+          if (thisLB <= thatLB) {
+            lastLowerBound = thisLB;
+            lastUpperBound = thisUB;
+            thisIndex ++;
+          } else {
+            lastLowerBound = thatLB;
+            lastUpperBound = thatUB;
+            thatIndex ++;
+          }
+        } else {
+          if (thisLB <= lastUpperBound + 1 && thisUB >= lastLowerBound - 1) {
+            // the next interval from this can merge with last interval
+            lastLowerBound = Math.min(thisLB, lastLowerBound);
+            lastUpperBound = Math.max(thisUB, lastUpperBound);
+            thisIndex ++;
+          } else if (thatLB <= lastUpperBound + 1 && thatUB >= lastLowerBound - 1) {
+            // the next interval from that can merge with last interval
+            lastLowerBound = Math.min(thatLB, lastLowerBound);
+            lastUpperBound = Math.max(thatUB, lastUpperBound);
+            thatIndex ++;
+          } else {
+            // neither intervals can merge, add the last interval to the result and select a new
+            // one as base
+            result.addInterval(lastLowerBound, lastUpperBound);
+            lastBoundSet = false;
+          }
+        }
+      }
+      // merge the remaining intervals
+      TimeIntervals remainingIntervals = thisIndex < thisSize ? this : that;
+      int remainingIndex = thisIndex < thisSize ? thisIndex : thatIndex;
+      for (int i = remainingIndex; i < remainingIntervals.getIntervalSize(); i++) {
+        long lb = remainingIntervals.getLowerBound(i);
+        long ub = remainingIntervals.getUpperBound(i);
+        if (lb <= lastUpperBound && ub >= lastLowerBound) {
+          // the next interval can merge with last interval
+          lastLowerBound = Math.min(lb, lastLowerBound);
+          lastUpperBound = Math.max(ub, lastUpperBound);
+        } else {
+          // the two interval does not intersect, add the previous interval to the result
+          result.addInterval(lastLowerBound, lastUpperBound);
+          lastLowerBound = lb;
+          lastUpperBound = ub;
+        }
+      }
+      // add the last interval
+      result.addInterval(lastLowerBound, lastLowerBound);
+      return result;
+    }
+
+    public TimeIntervals not() {
+      if (isEmpty()) {
+        return ALL_INTERVAL;
+      }
+      TimeIntervals result = new TimeIntervals();
+      long firstLB = getLowerBound(0);
+      if (firstLB != Long.MIN_VALUE) {
+        result.addInterval(Long.MIN_VALUE, firstLB - 1);
+      }
+
+      int intervalSize = getIntervalSize();
+      for (int i = 0; i < intervalSize - 1; i++) {
+        long currentUB = getUpperBound(i);
+        long nextLB = getLowerBound(i + 1);
+        if (currentUB + 1 <= nextLB -1) {
+          result.addInterval(currentUB + 1, nextLB -1);
+        }
+      }
+
+      long lastUB = getUpperBound(result.getIntervalSize() - 1);
+      if (lastUB != Long.MAX_VALUE) {
+        result.addInterval(lastUB + 1, Long.MAX_VALUE);
+      }
+      return result;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 9dcb5c9..c832da8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -869,15 +869,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   }
 
-
-
   /**
    * create QueryDataSet and buffer it for fetchResults
    */
   private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan) throws
           QueryProcessException, QueryFilterOptimizationException, StorageEngineException, IOException, MetadataException, SQLException {
 
-    QueryContext context = new QueryContext(queryId);
+    QueryContext context = genQueryContext(queryId);
     QueryDataSet queryDataSet = processor.getExecutor().processQuery(physicalPlan, context);
     queryId2DataSet.put(queryId, queryDataSet);
     return queryDataSet;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
index b1cf764..8de0118 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
@@ -146,4 +146,12 @@ public class GroupByFilter implements Filter, Serializable {
   public int hashCode() {
     return Objects.hash(unit, slidingStep, startTime, endTime);
   }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getEndTime() {
+    return endTime;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java
index 1bb76d4..3767f01 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java
@@ -118,4 +118,8 @@ public class In<T extends Comparable<T>> implements Filter {
   public FilterSerializeId getSerializeId() {
     return FilterSerializeId.IN;
   }
+
+  public Set<T> getValues() {
+    return values;
+  }
 }