You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/02/13 09:46:56 UTC
[incubator-iotdb] branch cluster_node_deletion updated: add time
intervals
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_node_deletion by this push:
new 970f582 add time intervals
970f582 is described below
commit 970f5824479050b840455f40a72e3715dcc0b79c
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Feb 13 17:46:38 2020 +0800
add time intervals
---
.../handlers/caller/AppendNodeEntryHandler.java | 1 +
.../iotdb/cluster/server/member/RaftMember.java | 70 +++----
.../apache/iotdb/cluster/utils/PartitionUtils.java | 225 ++++++++++++++++++++-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 4 +-
.../iotdb/tsfile/read/filter/GroupByFilter.java | 8 +
.../iotdb/tsfile/read/filter/operator/In.java | 4 +
6 files changed, 268 insertions(+), 44 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index dad3493..db81efd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -48,6 +48,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
@Override
public void onComplete(Long response) {
+ logger.debug("Append response {} from {}", response, receiver);
if (leaderShipStale.get()) {
// someone has rejected this log because the leadership is stale
return;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 214ae19..335a721 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -76,6 +76,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
ClusterConfig config = ClusterDescriptor.getINSTANCE().getConfig();
private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
+ private static final int SEND_LOG_RETRY = 3;
String name;
@@ -198,7 +199,6 @@ public abstract class RaftMember implements RaftService.AsyncIface {
return;
}
- long thisTerm = term.get();
if (character != NodeCharacter.ELECTOR) {
// only elector votes
resultHandler.onComplete(Response.RESPONSE_LEADER_STILL_ONLINE);
@@ -295,6 +295,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
try {
Log log = LogParser.getINSTANCE().parse(request.entry);
resultHandler.onComplete(appendEntry(log));
+ logger.debug("{} AppendEntryRequest completed", name);
} catch (UnknownLogTypeException e) {
resultHandler.onError(e);
}
@@ -322,7 +323,6 @@ public abstract class RaftMember implements RaftService.AsyncIface {
}
// synchronized: logs are serialized
- //TODO why synchronized?
private synchronized AppendLogResult sendLogToFollowers(Log log, AtomicInteger quorum) {
if (allNodes.size() == 1) {
// single node group, does not need the agreement of others
@@ -342,12 +342,9 @@ public abstract class RaftMember implements RaftService.AsyncIface {
}
synchronized (quorum) {//this synchronized codes are just for calling quorum.wait.
- //TODO As we have used synchronized (), do we really need to use AtomicInteger?
// synchronized: avoid concurrent modification
synchronized (allNodes) {
- //TODO allNodes.sync is only used here. Is that needed?
- //By the way, readLock is ok for this case.
for (Node node : allNodes) {
AsyncClient client = connectNode(node);
if (client != null) {
@@ -359,6 +356,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
handler.setReceiverTerm(newLeaderTerm);
try {
client.appendEntry(request, handler);
+ logger.debug("{} sending a log to {}: {}", name, node, log);
} catch (Exception e) {
logger.warn("{} cannot append log to node {}", name, node, e);
}
@@ -408,12 +406,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
allNodes.add(thisNode);
}
- public void setLastCatchUpResponseTime(
- Map<Node, Long> lastCatchUpResponseTime) {
- this.lastCatchUpResponseTime = lastCatchUpResponseTime;
- }
-
- public void /**/setCharacter(NodeCharacter character) {
+ public void setCharacter(NodeCharacter character) {
logger.info("{} has become a {}", name, character);
this.character = character;
}
@@ -672,29 +665,34 @@ public abstract class RaftMember implements RaftService.AsyncIface {
log.setPlan(plan);
logManager.appendLog(log);
- logger.debug("{}: Send plan {} to other nodes", name, plan);
- AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
-
- switch (result) {
- case OK:
- logger.debug("{}: Plan {} is accepted", name, plan);
- try {
- logManager.commitLog(log);
- } catch (QueryProcessException e) {
- logger.info("{}: The log {} is not successfully applied, reverting", name, log, e);
+ retry:
+ for (int i = SEND_LOG_RETRY; i >= 0; i--) {
+ logger.debug("{}: Send plan {} to other nodes, retry remaining {}", name, plan, i);
+ AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
+ switch (result) {
+ case OK:
+ logger.debug("{}: Plan {} is accepted", name, plan);
+ try {
+ logManager.commitLog(log);
+ } catch (QueryProcessException e) {
+ logger.info("{}: The log {} is not successfully applied, reverting", name, log, e);
+ logManager.removeLastLog();
+ TSStatus status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+ status.getStatusType().setMessage(e.getMessage());
+ return status;
+ }
+ return StatusUtils.OK;
+ case TIME_OUT:
+ logger.debug("{}: Plan {} timed out", name, plan);
+ if (i == 1) {
+ return StatusUtils.TIME_OUT;
+ }
+ break;
+ case LEADERSHIP_STALE:
+ default:
logManager.removeLastLog();
- TSStatus status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
- status.getStatusType().setMessage(e.getMessage());
- return status;
- }
- return StatusUtils.OK;
- case TIME_OUT:
- logger.debug("{}: Plan {} timed out", name, plan);
- logManager.removeLastLog();
- return StatusUtils.TIME_OUT;
- case LEADERSHIP_STALE:
- default:
- logManager.removeLastLog();
+ break retry;
+ }
}
}
return null;
@@ -798,11 +796,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
byte[] bytes = new byte[length];
ByteBuffer result = ByteBuffer.wrap(bytes);
int len = bufferedInputStream.read(bytes);
- if (len > 0) {
- result.limit(len);
- } else {
- result.limit(0);
- }
+ result.limit(Math.max(len, 0));
resultHandler.onComplete(result);
} catch (IOException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 6bf8b89..c700931 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -21,16 +21,14 @@ package org.apache.iotdb.cluster.utils;
import static org.apache.iotdb.cluster.config.ClusterConstant.HASH_SALT;
+import java.util.ArrayList;
import org.apache.iotdb.cluster.config.ClusterConstant;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
@@ -41,6 +39,18 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeEq;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeIn;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLt;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLtEq;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeNotEq;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.NotFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
import org.apache.iotdb.tsfile.utils.Murmur128Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,5 +124,214 @@ public class PartitionUtils {
return newPlan;
}
+ public static TimeIntervals extractTimeInterval(Filter filter) {
+ // and, or, not, value, time, group by
+ // eq, neq, gt, gteq, lt, lteq, in
+ if (filter instanceof AndFilter) {
+ AndFilter andFilter = ((AndFilter) filter);
+ TimeIntervals leftIntervals = extractTimeInterval(andFilter.getLeft());
+ TimeIntervals rightIntervals = extractTimeInterval(andFilter.getRight());
+ return leftIntervals.intersection(rightIntervals);
+ } else if (filter instanceof OrFilter) {
+ OrFilter orFilter = ((OrFilter) filter);
+ TimeIntervals leftIntervals = extractTimeInterval(orFilter.getLeft());
+ TimeIntervals rightIntervals = extractTimeInterval(orFilter.getRight());
+ return leftIntervals.union(rightIntervals);
+ } else if (filter instanceof NotFilter) {
+ NotFilter notFilter = ((NotFilter) filter);
+ return extractTimeInterval(notFilter.getFilter()).not();
+ } else if (filter instanceof TimeGt) {
+ TimeGt timeGt = ((TimeGt) filter);
+ return new TimeIntervals(((long) timeGt.getValue()) + 1, Long.MAX_VALUE);
+ } else if (filter instanceof TimeGtEq) {
+ TimeGtEq timeGtEq = ((TimeGtEq) filter);
+ return new TimeIntervals(((long) timeGtEq.getValue()), Long.MAX_VALUE);
+ } else if (filter instanceof TimeEq) {
+ TimeEq timeEq = ((TimeEq) filter);
+ return new TimeIntervals(((long) timeEq.getValue()), ((long) timeEq.getValue()));
+ } else if (filter instanceof TimeNotEq) {
+ TimeNotEq timeNotEq = ((TimeNotEq) filter);
+ TimeIntervals intervals = new TimeIntervals();
+ intervals.addInterval(Long.MIN_VALUE, (long) timeNotEq.getValue() - 1);
+ intervals.addInterval((long) timeNotEq.getValue() + 1, Long.MAX_VALUE);
+ return intervals;
+ } else if (filter instanceof TimeLt) {
+ TimeLt timeLt = ((TimeLt) filter);
+ return new TimeIntervals(Long.MIN_VALUE, (long) timeLt.getValue() - 1);
+ } else if (filter instanceof TimeLtEq) {
+ TimeLtEq timeLtEq = ((TimeLtEq) filter);
+ return new TimeIntervals(Long.MIN_VALUE, (long) timeLtEq.getValue());
+ } else if (filter instanceof TimeIn) {
+ TimeIn timeIn = ((TimeIn) filter);
+ TimeIntervals intervals = new TimeIntervals();
+ for (Object value : timeIn.getValues()) {
+ long time = ((long) value);
+ intervals.addInterval(time, time);
+ }
+ return intervals;
+ } else if (filter instanceof GroupByFilter) {
+ GroupByFilter groupByFilter = ((GroupByFilter) filter);
+ return new TimeIntervals(groupByFilter.getStartTime(), groupByFilter.getEndTime() + 1);
+ }
+ logger.warn("Unrecognized filter class: {}", filter.getClass());
+ return TimeIntervals.ALL_INTERVAL;
+ }
+
+ public static class TimeIntervals extends ArrayList<Long> {
+
+ public static final TimeIntervals ALL_INTERVAL = new TimeIntervals(Long.MIN_VALUE,
+ Long.MAX_VALUE);
+
+ public TimeIntervals() {
+ super();
+ }
+
+ public TimeIntervals(long lowerBound, long upperBound) {
+ super();
+ addInterval(lowerBound, upperBound);
+ }
+
+ public int getIntervalSize() {
+ return size() / 2;
+ }
+
+ public long getLowerBound(int index) {
+ return get(index * 2);
+ }
+
+ public long getUpperBound(int index) {
+ return get(index * 2 + 1);
+ }
+ public void addInterval(long lowerBound, long upperBound) {
+ add(lowerBound);
+ add(upperBound);
+ }
+
+ public TimeIntervals intersection(TimeIntervals that) {
+ TimeIntervals result = new TimeIntervals();
+ int thisSize = this.getIntervalSize();
+ int thatSize = that.getIntervalSize();
+ for (int i = 0; i < thisSize; i++) {
+ for (int j = 0; j < thatSize; j++) {
+ long thisLB = this.getLowerBound(i);
+ long thisUB = this.getUpperBound(i);
+ long thatLB = that.getLowerBound(i);
+ long thatUB = that.getUpperBound(i);
+ if (thisUB >= thatLB) {
+ if (thisUB <= thatUB) {
+ result.addInterval(Math.max(thisLB, thatLB), thisUB);
+ } else if (thisLB <= thatUB) {
+ result.addInterval(Math.max(thisLB, thatLB), thatUB);
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * The union is implemented by merge, so the two intervals must be ordered.
+ * @param that
+ * @return
+ */
+ public TimeIntervals union(TimeIntervals that) {
+ TimeIntervals result = new TimeIntervals();
+ if (this.isEmpty()) {
+ return that;
+ } else if (that.isEmpty()) {
+ return this;
+ }
+
+ int thisSize = this.getIntervalSize();
+ int thatSize = that.getIntervalSize();
+ int thisIndex = 0;
+ int thatIndex = 0;
+ long lastLowerBound = 0;
+ long lastUpperBound = 0;
+ boolean lastBoundSet = false;
+ // merge the heads of the two intervals
+ while (thisIndex < thisSize && thatIndex < thatSize) {
+ long thisLB = this.getLowerBound(thisIndex);
+ long thisUB = this.getUpperBound(thisIndex);
+ long thatLB = that.getLowerBound(thatIndex);
+ long thatUB = that.getUpperBound(thatIndex);
+ if (!lastBoundSet) {
+ lastBoundSet = true;
+ if (thisLB <= thatLB) {
+ lastLowerBound = thisLB;
+ lastUpperBound = thisUB;
+ thisIndex ++;
+ } else {
+ lastLowerBound = thatLB;
+ lastUpperBound = thatUB;
+ thatIndex ++;
+ }
+ } else {
+ if (thisLB <= lastUpperBound + 1 && thisUB >= lastLowerBound - 1) {
+ // the next interval from this can merge with last interval
+ lastLowerBound = Math.min(thisLB, lastLowerBound);
+ lastUpperBound = Math.max(thisUB, lastUpperBound);
+ thisIndex ++;
+ } else if (thatLB <= lastUpperBound + 1 && thatUB >= lastLowerBound - 1) {
+ // the next interval from that can merge with last interval
+ lastLowerBound = Math.min(thatLB, lastLowerBound);
+ lastUpperBound = Math.max(thatUB, lastUpperBound);
+ thatIndex ++;
+ } else {
+ // neither intervals can merge, add the last interval to the result and select a new
+ // one as base
+ result.addInterval(lastLowerBound, lastUpperBound);
+ lastBoundSet = false;
+ }
+ }
+ }
+ // merge the remaining intervals
+ TimeIntervals remainingIntervals = thisIndex < thisSize ? this : that;
+ int remainingIndex = thisIndex < thisSize ? thisIndex : thatIndex;
+ for (int i = remainingIndex; i < remainingIntervals.getIntervalSize(); i++) {
+ long lb = remainingIntervals.getLowerBound(i);
+ long ub = remainingIntervals.getUpperBound(i);
+ if (lb <= lastUpperBound && ub >= lastLowerBound) {
+ // the next interval can merge with last interval
+ lastLowerBound = Math.min(lb, lastLowerBound);
+ lastUpperBound = Math.max(ub, lastUpperBound);
+ } else {
+ // the two interval does not intersect, add the previous interval to the result
+ result.addInterval(lastLowerBound, lastUpperBound);
+ lastLowerBound = lb;
+ lastUpperBound = ub;
+ }
+ }
+ // add the last interval
+ result.addInterval(lastLowerBound, lastLowerBound);
+ return result;
+ }
+
+ public TimeIntervals not() {
+ if (isEmpty()) {
+ return ALL_INTERVAL;
+ }
+ TimeIntervals result = new TimeIntervals();
+ long firstLB = getLowerBound(0);
+ if (firstLB != Long.MIN_VALUE) {
+ result.addInterval(Long.MIN_VALUE, firstLB - 1);
+ }
+
+ int intervalSize = getIntervalSize();
+ for (int i = 0; i < intervalSize - 1; i++) {
+ long currentUB = getUpperBound(i);
+ long nextLB = getLowerBound(i + 1);
+ if (currentUB + 1 <= nextLB -1) {
+ result.addInterval(currentUB + 1, nextLB -1);
+ }
+ }
+
+ long lastUB = getUpperBound(result.getIntervalSize() - 1);
+ if (lastUB != Long.MAX_VALUE) {
+ result.addInterval(lastUB + 1, Long.MAX_VALUE);
+ }
+ return result;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 9dcb5c9..c832da8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -869,15 +869,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
-
-
/**
* create QueryDataSet and buffer it for fetchResults
*/
private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan) throws
QueryProcessException, QueryFilterOptimizationException, StorageEngineException, IOException, MetadataException, SQLException {
- QueryContext context = new QueryContext(queryId);
+ QueryContext context = genQueryContext(queryId);
QueryDataSet queryDataSet = processor.getExecutor().processQuery(physicalPlan, context);
queryId2DataSet.put(queryId, queryDataSet);
return queryDataSet;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
index b1cf764..8de0118 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
@@ -146,4 +146,12 @@ public class GroupByFilter implements Filter, Serializable {
public int hashCode() {
return Objects.hash(unit, slidingStep, startTime, endTime);
}
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java
index 1bb76d4..3767f01 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java
@@ -118,4 +118,8 @@ public class In<T extends Comparable<T>> implements Filter {
public FilterSerializeId getSerializeId() {
return FilterSerializeId.IN;
}
+
+ public Set<T> getValues() {
+ return values;
+ }
}