You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/03/19 11:00:14 UTC

[GitHub] [iotdb] wangchao316 opened a new pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

wangchao316 opened a new pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875


   …ader
   
   ## Description
   
   
   ### Content1 ...
   
   ### Content2 ...
   
   ### Content3 ...
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error 
       conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, 
       design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design 
   (or naming) decision point and compare the alternatives with the designs that you've implemented 
   (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere 
   (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), 
   link to that discussion from this PR description and explain what have changed in your final design 
   compared to your original proposal or the consensus version in the end of the discussion. 
   If something hasn't changed since the original discussion, you can omit a detailed discussion of 
   those aspects of the design here, perhaps apart from brief mentioning for the sake of readability 
   of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
       - [ ] concurrent read
       - [ ] concurrent write
       - [ ] concurrent read and write 
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. 
   - [ ] added or updated version, __license__, or notice information
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious 
     for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold 
     for code coverage.
   - [ ] added integration tests.
   - [ ] been tested in a test IoTDB cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items 
   apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items 
   from the checklist above are strictly necessary, but it would be very helpful if you at least 
   self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes (or packages if there are too many classes) in this PR
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r598082474



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultManagedMergeReader.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+public class MultManagedMergeReader extends MultPriorityMergeReader implements ManagedSeriesReader {
+

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r600255785



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultPriorityMergeReader.java
##########
@@ -18,15 +18,17 @@
  */
 package org.apache.iotdb.cluster.query.reader.mult;
 
+import org.apache.iotdb.db.query.reader.universal.Element;
 import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import java.io.IOException;
-import java.util.Set;
 
-/** This class implements {@link IPointReader} for data sources with different priorities. */
-public class MultPriorityMergeReader extends PriorityMergeReader implements IMultPointReader {
+/**
+ * This class extends {@link extends PriorityMergeReader} for data sources with different
+ * priorities.
+ */
+public class MultPriorityMergeReader extends PriorityMergeReader {

Review comment:
       Thanks,  done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jt2594838 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r597597884



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -201,6 +212,139 @@ private IReaderByTimestamp getRemoteReaderByTimestamp(
         new RequestTimeOutException("Query by timestamp: " + path + " in " + partitionGroup));
   }
 
+  /**
+   * Create a ManagedSeriesReader that can read the data of "path" with filters in the whole
+   * cluster. The data groups that should be queried will be determined by the timeFilter, then for
+   * each group a series reader will be created, and finally all such readers will be merged into
+   * one.
+   *
+   * @param timeFilter nullable, when null, all data groups will be queried
+   * @param valueFilter nullable
+   */
+  public List<IMultPointReader> getMultSeriesReader(

Review comment:
       The comment does not correspond to the method.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -201,6 +212,139 @@ private IReaderByTimestamp getRemoteReaderByTimestamp(
         new RequestTimeOutException("Query by timestamp: " + path + " in " + partitionGroup));
   }
 
+  /**
+   * Create a ManagedSeriesReader that can read the data of "path" with filters in the whole
+   * cluster. The data groups that should be queried will be determined by the timeFilter, then for
+   * each group a series reader will be created, and finally all such readers will be merged into
+   * one.
+   *
+   * @param timeFilter nullable, when null, all data groups will be queried
+   * @param valueFilter nullable
+   */
+  public List<IMultPointReader> getMultSeriesReader(
+      List<PartialPath> paths,
+      Map<String, Set<String>> deviceMeasurements,
+      List<TSDataType> dataTypes,
+      Filter timeFilter,
+      Filter valueFilter,
+      QueryContext context,
+      boolean ascending)
+      throws StorageEngineException, EmptyIntervalException {
+    // find the groups that should be queried using the timeFilter
+
+    Map<PartitionGroup, List<PartialPath>> partitionGroupListMap = Maps.newHashMap();
+    for (PartialPath partialPath : paths) {
+      List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(timeFilter, partialPath);
+      partitionGroups.forEach(
+          partitionGroup -> {
+            partitionGroupListMap
+                .computeIfAbsent(partitionGroup, n -> new ArrayList<>())
+                .add(partialPath);
+          });
+    }
+
+    List<IMultPointReader> multPointReaders = Lists.newArrayList();
+
+    partitionGroupListMap
+        .keySet()
+        .forEach(
+            partitionGroup -> {
+              List<PartialPath> partialPaths = partitionGroupListMap.get(partitionGroup);

Review comment:
       Maybe you can iterate the `entrySet` instead.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class MultBatchReader implements IMultBatchReader {
+
+  private Map<String, IBatchReader> pathBatchReaders;
+
+  public MultBatchReader(Map<String, IBatchReader> pathBatchReaders) {
+    this.pathBatchReaders = pathBatchReaders;
+  }
+
+  /**
+   * reader has next batch data
+   *
+   * @return true if only one reader has next batch data, otherwise false
+   * @throws IOException
+   */
+  @Override
+  public boolean hasNextBatch() throws IOException {
+    for (IBatchReader reader : pathBatchReaders.values()) {
+      if (reader.hasNextBatch()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean hasNextBatch(String fullPath) throws IOException {
+    return pathBatchReaders.get(fullPath).hasNextBatch();
+  }
+
+  @Override
+  public BatchData nextBatch(String fullPath) throws IOException {
+    return pathBatchReaders.get(fullPath).nextBatch();
+  }
+
+  @Override
+  public BatchData nextBatch() throws IOException {
+    return null;

Review comment:
       This could be dangerous. I suggest throwing an exception instead.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * RemoteSimpleSeriesReader is a reader without value filter that reads points from a remote side.
+ */
+public class RemoteMultSeriesReader implements IMultPointReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(RemoteMultSeriesReader.class);
+  private static final int FETCH_BATCH_DATA_SIZE = 10;
+
+  private MultDataSourceInfo sourceInfo;
+
+  private Map<String, Queue<BatchData>> cachedBatchs;
+
+  private AtomicReference<Map<String, ByteBuffer>> fetchResult = new AtomicReference<>();
+  private GenericHandler<Map<String, ByteBuffer>> handler;
+
+  private BatchStrategy batchStrategy;
+
+  private Map<String, BatchData> currentBatchDatas;
+
+  public RemoteMultSeriesReader(MultDataSourceInfo sourceInfo) {
+    this.sourceInfo = sourceInfo;
+    this.handler = new GenericHandler<>(sourceInfo.getCurrentNode(), fetchResult);
+    this.cachedBatchs = Maps.newHashMap();
+    this.sourceInfo
+        .getPartialPaths()
+        .forEach(
+            partialPath -> {
+              this.cachedBatchs.put(partialPath.getFullPath(), new ConcurrentLinkedQueue<>());
+            });
+    this.cachedBatchs = Maps.newHashMap();
+    this.batchStrategy = new DefaultBatchStrategy();
+  }
+
+  @Override
+  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+    BatchData batchData = currentBatchDatas.get(fullPath);
+    if (batchData != null && batchData.hasCurrent()) {
+      return true;
+    }
+    fetchBatch();
+    return checkPathBatchData(fullPath);
+  }
+
+  private boolean checkPathBatchData(String fullPath) {
+    BatchData batchData = cachedBatchs.get(fullPath).peek();
+    if (!batchData.isEmpty()) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
+    BatchData batchData = currentBatchDatas.get(fullPath);
+    if ((batchData == null || !batchData.hasCurrent()) && checkPathBatchData(fullPath)) {
+      batchData = cachedBatchs.get(fullPath).poll();
+      currentBatchDatas.put(fullPath, batchData);
+    }
+
+    if (!hasNextTimeValuePair(fullPath)) {
+      throw new NoSuchElementException();
+    }
+
+    TSDataType dataType = null;
+    for (int i = 0; i < sourceInfo.getPartialPaths().size(); i++) {
+      if (fullPath.equals(sourceInfo.getPartialPaths().get(i).getFullPath())) {
+        dataType = sourceInfo.getDataTypes().get(i);
+      }
+    }

Review comment:
       This should be aided with an index. Comparing strings each time could be slow.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -201,6 +212,139 @@ private IReaderByTimestamp getRemoteReaderByTimestamp(
         new RequestTimeOutException("Query by timestamp: " + path + " in " + partitionGroup));
   }
 
+  /**
+   * Create a ManagedSeriesReader that can read the data of "path" with filters in the whole
+   * cluster. The data groups that should be queried will be determined by the timeFilter, then for
+   * each group a series reader will be created, and finally all such readers will be merged into
+   * one.
+   *
+   * @param timeFilter nullable, when null, all data groups will be queried
+   * @param valueFilter nullable
+   */
+  public List<IMultPointReader> getMultSeriesReader(
+      List<PartialPath> paths,
+      Map<String, Set<String>> deviceMeasurements,
+      List<TSDataType> dataTypes,
+      Filter timeFilter,
+      Filter valueFilter,
+      QueryContext context,
+      boolean ascending)
+      throws StorageEngineException, EmptyIntervalException {
+    // find the groups that should be queried using the timeFilter
+
+    Map<PartitionGroup, List<PartialPath>> partitionGroupListMap = Maps.newHashMap();
+    for (PartialPath partialPath : paths) {
+      List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(timeFilter, partialPath);
+      partitionGroups.forEach(
+          partitionGroup -> {
+            partitionGroupListMap
+                .computeIfAbsent(partitionGroup, n -> new ArrayList<>())
+                .add(partialPath);
+          });
+    }
+
+    List<IMultPointReader> multPointReaders = Lists.newArrayList();
+
+    partitionGroupListMap
+        .keySet()
+        .forEach(
+            partitionGroup -> {
+              List<PartialPath> partialPaths = partitionGroupListMap.get(partitionGroup);
+              Map<String, Set<String>> partitionGroupDeviceMeasurements = Maps.newHashMap();
+              List<TSDataType> partitionGroupTSDataType = Lists.newArrayList();
+              partialPaths.forEach(
+                  partialPath -> {
+                    Set<String> measurements =
+                        deviceMeasurements.getOrDefault(
+                            partialPath.getDevice(), Collections.emptySet());
+                    partitionGroupDeviceMeasurements.put(partialPath.getFullPath(), measurements);
+                    partitionGroupTSDataType.add(dataTypes.get(paths.lastIndexOf(partialPath)));
+                  });
+
+              try {
+                IMultPointReader iMultPointReader =
+                    (IMultPointReader)
+                        getMultSeriesReader(
+                            partitionGroup,
+                            partialPaths,
+                            partitionGroupTSDataType,
+                            partitionGroupDeviceMeasurements,
+                            timeFilter,
+                            valueFilter,
+                            context,
+                            ascending);
+                multPointReaders.add(iMultPointReader);
+              } catch (Exception e) {
+

Review comment:
       Add a log or explain why it can be ignored.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IMultBatchReader.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+
+import java.io.IOException;
+
+public interface IMultBatchReader extends IBatchReader {
+
+  boolean hasNextBatch(String fullPath) throws IOException;
+
+  BatchData nextBatch(String fullPath) throws IOException;

Review comment:
       What is the semantics when interfaces in `IBatchReader` are used by an `IMultBatchReader`?
   For example, what will an `IMultBatchReader` return, if `nextBatch` is called without providing the full path?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultPriorityMergeReader.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+
+import java.io.IOException;
+import java.util.Set;
+
+/** This class implements {@link IPointReader} for data sources with different priorities. */
+public class MultPriorityMergeReader extends PriorityMergeReader implements IMultPointReader {
+
+  private String fullPath;
+
+  public MultPriorityMergeReader(String fullPath) {
+    super();
+    this.fullPath = fullPath;
+  }
+
+  public void addReader(IMultPointReader reader, long priority) throws IOException {
+    if (reader.hasNextTimeValuePair(fullPath)) {
+      heap.add(
+          new MultElement(
+              reader, reader.nextTimeValuePair(fullPath), new MergeReaderPriority(priority, 0)));
+    } else {
+      reader.close();
+    }
+  }
+
+  @Override
+  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+    return false;
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
+    return null;
+  }

Review comment:
       Seeing this, I do not think this class should be an `IMultPointReader`.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultManagedMergeReader.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+public class MultManagedMergeReader extends MultPriorityMergeReader implements ManagedSeriesReader {
+

Review comment:
       Can we find a better name? Actually, this reader only concerns one timeseries so `Mult` in the name is misleading.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * RemoteSimpleSeriesReader is a reader without value filter that reads points from a remote side.
+ */
+public class RemoteMultSeriesReader implements IMultPointReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(RemoteMultSeriesReader.class);
+  private static final int FETCH_BATCH_DATA_SIZE = 10;
+
+  private MultDataSourceInfo sourceInfo;
+
+  private Map<String, Queue<BatchData>> cachedBatchs;
+
+  private AtomicReference<Map<String, ByteBuffer>> fetchResult = new AtomicReference<>();
+  private GenericHandler<Map<String, ByteBuffer>> handler;
+
+  private BatchStrategy batchStrategy;
+
+  private Map<String, BatchData> currentBatchDatas;
+
+  public RemoteMultSeriesReader(MultDataSourceInfo sourceInfo) {
+    this.sourceInfo = sourceInfo;
+    this.handler = new GenericHandler<>(sourceInfo.getCurrentNode(), fetchResult);
+    this.cachedBatchs = Maps.newHashMap();
+    this.sourceInfo
+        .getPartialPaths()
+        .forEach(
+            partialPath -> {
+              this.cachedBatchs.put(partialPath.getFullPath(), new ConcurrentLinkedQueue<>());
+            });
+    this.cachedBatchs = Maps.newHashMap();

Review comment:
       The second ` this.cachedBatchs = Maps.newHashMap();` should be  `this.currentBatchDatas = Maps.newHashMap()`;.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jt2594838 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r600220603



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultPriorityMergeReader.java
##########
@@ -18,15 +18,17 @@
  */
 package org.apache.iotdb.cluster.query.reader.mult;
 
+import org.apache.iotdb.db.query.reader.universal.Element;
 import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import java.io.IOException;
-import java.util.Set;
 
-/** This class implements {@link IPointReader} for data sources with different priorities. */
-public class MultPriorityMergeReader extends PriorityMergeReader implements IMultPointReader {
+/**
+ * This class extends {@link extends PriorityMergeReader} for data sources with different
+ * priorities.
+ */
+public class MultPriorityMergeReader extends PriorityMergeReader {

Review comment:
       `Mult` in the name should be replaced.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AbstractMultPointReader.java
##########
@@ -24,11 +24,26 @@
 import java.io.IOException;
 import java.util.Set;
 
-public interface IMultPointReader extends IPointReader {
+public abstract class AbstractMultPointReader implements IPointReader {
 
-  boolean hasNextTimeValuePair(String fullPath) throws IOException;
+  public abstract boolean hasNextTimeValuePair(String fullPath) throws IOException;
 
-  TimeValuePair nextTimeValuePair(String fullPath) throws IOException;
+  public abstract TimeValuePair nextTimeValuePair(String fullPath) throws IOException;
 
-  Set<String> getAllPaths();
+  public abstract Set<String> getAllPaths();
+
+  @Override
+  public boolean hasNextTimeValuePair() throws IOException {
+    return false;
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair() throws IOException {
+    return null;
+  }
+
+  @Override
+  public TimeValuePair currentTimeValuePair() throws IOException {
+    return null;
+  }

Review comment:
       These methods had better be protected with exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r598077599



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -201,6 +212,139 @@ private IReaderByTimestamp getRemoteReaderByTimestamp(
         new RequestTimeOutException("Query by timestamp: " + path + " in " + partitionGroup));
   }
 
+  /**
+   * Create a ManagedSeriesReader that can read the data of "path" with filters in the whole
+   * cluster. The data groups that should be queried will be determined by the timeFilter, then for
+   * each group a series reader will be created, and finally all such readers will be merged into
+   * one.
+   *
+   * @param timeFilter nullable, when null, all data groups will be queried
+   * @param valueFilter nullable
+   */
+  public List<IMultPointReader> getMultSeriesReader(
+      List<PartialPath> paths,
+      Map<String, Set<String>> deviceMeasurements,
+      List<TSDataType> dataTypes,
+      Filter timeFilter,
+      Filter valueFilter,
+      QueryContext context,
+      boolean ascending)
+      throws StorageEngineException, EmptyIntervalException {
+    // find the groups that should be queried using the timeFilter
+
+    Map<PartitionGroup, List<PartialPath>> partitionGroupListMap = Maps.newHashMap();
+    for (PartialPath partialPath : paths) {
+      List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(timeFilter, partialPath);
+      partitionGroups.forEach(
+          partitionGroup -> {
+            partitionGroupListMap
+                .computeIfAbsent(partitionGroup, n -> new ArrayList<>())
+                .add(partialPath);
+          });
+    }
+
+    List<IMultPointReader> multPointReaders = Lists.newArrayList();
+
+    partitionGroupListMap
+        .keySet()
+        .forEach(
+            partitionGroup -> {
+              List<PartialPath> partialPaths = partitionGroupListMap.get(partitionGroup);

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jxlgzwh commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
jxlgzwh commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r600990578



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
##########
@@ -236,6 +280,105 @@ public long querySingleSeries(SingleSeriesQueryRequest request)
     }
   }
 
+  /**
+   * Create an IBatchReader of a path, register it in the query manager to get a reader id for it
+   * and send the id back to the requester. If the reader does not have any data, an id of -1 will
+   * be returned.
+   *
+   * @param request
+   */
+  public long queryMultSeries(MultSeriesQueryRequest request)
+      throws CheckConsistencyException, QueryProcessException, StorageEngineException, IOException {
+    logger.debug(
+        "{}: {} is querying {}, queryId: {}",
+        name,
+        request.getRequester(),
+        request.getPath(),
+        request.getQueryId());
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
+
+    List<PartialPath> paths = Lists.newArrayList();
+    request
+        .getPath()
+        .forEach(
+            fullPath -> {
+              try {
+                paths.add(new PartialPath(fullPath));
+              } catch (IllegalPathException e) {
+                // ignore

Review comment:
       I suggest to output exception information through logger, don’t ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] mychaow commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
mychaow commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r601008516



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
##########
@@ -58,6 +64,78 @@
     this.readerFactory = new ClusterReaderFactory(metaGroupMember);
   }
 
+  /**
+   * use mult batch query for without value filter
+   *
+   * @param context query context
+   * @return query data set
+   * @throws StorageEngineException
+   */
+  @Override
+  public QueryDataSet executeWithoutValueFilter(QueryContext context)
+      throws StorageEngineException {
+    try {
+      List<ManagedSeriesReader> readersOfSelectedSeries = initMultSeriesReader(context);
+      return new RawQueryDataSetWithoutValueFilter(
+          context.getQueryId(),
+          queryPlan.getDeduplicatedPaths(),
+          queryPlan.getDeduplicatedDataTypes(),
+          readersOfSelectedSeries,
+          queryPlan.isAscending());
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new StorageEngineException(e.getMessage());
+    } catch (IOException | EmptyIntervalException | QueryProcessException e) {
+      throw new StorageEngineException(e.getMessage());
+    }
+  }
+
+  private List<ManagedSeriesReader> initMultSeriesReader(QueryContext context)
+      throws StorageEngineException, IOException, EmptyIntervalException, QueryProcessException {
+    Filter timeFilter = null;
+    if (queryPlan.getExpression() != null) {
+      timeFilter = ((GlobalTimeExpression) queryPlan.getExpression()).getFilter();
+    }
+
+    // make sure the partition table is new
+    try {
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
+    } catch (CheckConsistencyException e) {
+      throw new StorageEngineException(e);
+    }
+    List<ManagedSeriesReader> readersOfSelectedSeries = Lists.newArrayList();
+    List<AbstractMultPointReader> multPointReaders = Lists.newArrayList();
+
+    multPointReaders =
+        readerFactory.getMultSeriesReader(
+            queryPlan.getDeduplicatedPaths(),
+            queryPlan.getDeviceToMeasurements(),
+            queryPlan.getDeduplicatedDataTypes(),
+            timeFilter,
+            null,
+            context,
+            queryPlan.isAscending());
+
+    // combine reader of different partition group of the same path
+    // into a MultManagedMergeReader
+    for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+      PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i);
+      TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+      AssignPathManagedMergeReader assignPathManagedMergeReader =
+          new AssignPathManagedMergeReader(partialPath.getFullPath(), dataType);
+      for (AbstractMultPointReader multPointReader : multPointReaders) {

Review comment:
       I think AbstractMultPointReader -> AbstractMultiPointReader is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r600256137



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AbstractMultPointReader.java
##########
@@ -24,11 +24,26 @@
 import java.io.IOException;
 import java.util.Set;
 
-public interface IMultPointReader extends IPointReader {
+public abstract class AbstractMultPointReader implements IPointReader {
 
-  boolean hasNextTimeValuePair(String fullPath) throws IOException;
+  public abstract boolean hasNextTimeValuePair(String fullPath) throws IOException;
 
-  TimeValuePair nextTimeValuePair(String fullPath) throws IOException;
+  public abstract TimeValuePair nextTimeValuePair(String fullPath) throws IOException;
 
-  Set<String> getAllPaths();
+  public abstract Set<String> getAllPaths();
+
+  @Override
+  public boolean hasNextTimeValuePair() throws IOException {
+    return false;
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair() throws IOException {
+    return null;
+  }
+
+  @Override
+  public TimeValuePair currentTimeValuePair() throws IOException {
+    return null;
+  }

Review comment:
       Thanks, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r601004233



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
##########
@@ -236,6 +280,105 @@ public long querySingleSeries(SingleSeriesQueryRequest request)
     }
   }
 
+  /**
+   * Create an IBatchReader of a path, register it in the query manager to get a reader id for it
+   * and send the id back to the requester. If the reader does not have any data, an id of -1 will
+   * be returned.
+   *
+   * @param request
+   */
+  public long queryMultSeries(MultSeriesQueryRequest request)
+      throws CheckConsistencyException, QueryProcessException, StorageEngineException, IOException {
+    logger.debug(
+        "{}: {} is querying {}, queryId: {}",
+        name,
+        request.getRequester(),
+        request.getPath(),
+        request.getQueryId());
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
+
+    List<PartialPath> paths = Lists.newArrayList();
+    request
+        .getPath()
+        .forEach(
+            fullPath -> {
+              try {
+                paths.add(new PartialPath(fullPath));
+              } catch (IllegalPathException e) {
+                // ignore

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r598077672



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * RemoteSimpleSeriesReader is a reader without value filter that reads points from a remote side.
+ */
+public class RemoteMultSeriesReader implements IMultPointReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(RemoteMultSeriesReader.class);
+  private static final int FETCH_BATCH_DATA_SIZE = 10;
+
+  private MultDataSourceInfo sourceInfo;
+
+  private Map<String, Queue<BatchData>> cachedBatchs;
+
+  private AtomicReference<Map<String, ByteBuffer>> fetchResult = new AtomicReference<>();
+  private GenericHandler<Map<String, ByteBuffer>> handler;
+
+  private BatchStrategy batchStrategy;
+
+  private Map<String, BatchData> currentBatchDatas;
+
+  public RemoteMultSeriesReader(MultDataSourceInfo sourceInfo) {
+    this.sourceInfo = sourceInfo;
+    this.handler = new GenericHandler<>(sourceInfo.getCurrentNode(), fetchResult);
+    this.cachedBatchs = Maps.newHashMap();
+    this.sourceInfo
+        .getPartialPaths()
+        .forEach(
+            partialPath -> {
+              this.cachedBatchs.put(partialPath.getFullPath(), new ConcurrentLinkedQueue<>());
+            });
+    this.cachedBatchs = Maps.newHashMap();

Review comment:
       thanks , done

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * RemoteSimpleSeriesReader is a reader without value filter that reads points from a remote side.
+ */
+public class RemoteMultSeriesReader implements IMultPointReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(RemoteMultSeriesReader.class);
+  private static final int FETCH_BATCH_DATA_SIZE = 10;
+
+  private MultDataSourceInfo sourceInfo;
+
+  private Map<String, Queue<BatchData>> cachedBatchs;
+
+  private AtomicReference<Map<String, ByteBuffer>> fetchResult = new AtomicReference<>();
+  private GenericHandler<Map<String, ByteBuffer>> handler;
+
+  private BatchStrategy batchStrategy;
+
+  private Map<String, BatchData> currentBatchDatas;
+
+  public RemoteMultSeriesReader(MultDataSourceInfo sourceInfo) {
+    this.sourceInfo = sourceInfo;
+    this.handler = new GenericHandler<>(sourceInfo.getCurrentNode(), fetchResult);
+    this.cachedBatchs = Maps.newHashMap();
+    this.sourceInfo
+        .getPartialPaths()
+        .forEach(
+            partialPath -> {
+              this.cachedBatchs.put(partialPath.getFullPath(), new ConcurrentLinkedQueue<>());
+            });
+    this.cachedBatchs = Maps.newHashMap();
+    this.batchStrategy = new DefaultBatchStrategy();
+  }
+
+  @Override
+  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+    BatchData batchData = currentBatchDatas.get(fullPath);
+    if (batchData != null && batchData.hasCurrent()) {
+      return true;
+    }
+    fetchBatch();
+    return checkPathBatchData(fullPath);
+  }
+
+  private boolean checkPathBatchData(String fullPath) {
+    BatchData batchData = cachedBatchs.get(fullPath).peek();
+    if (!batchData.isEmpty()) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
+    BatchData batchData = currentBatchDatas.get(fullPath);
+    if ((batchData == null || !batchData.hasCurrent()) && checkPathBatchData(fullPath)) {
+      batchData = cachedBatchs.get(fullPath).poll();
+      currentBatchDatas.put(fullPath, batchData);
+    }
+
+    if (!hasNextTimeValuePair(fullPath)) {
+      throw new NoSuchElementException();
+    }
+
+    TSDataType dataType = null;
+    for (int i = 0; i < sourceInfo.getPartialPaths().size(); i++) {
+      if (fullPath.equals(sourceInfo.getPartialPaths().get(i).getFullPath())) {
+        dataType = sourceInfo.getDataTypes().get(i);
+      }
+    }

Review comment:
       thanks, done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] neuyilan merged pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
neuyilan merged pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r598079287



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IMultBatchReader.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+
+import java.io.IOException;
+
+public interface IMultBatchReader extends IBatchReader {
+
+  boolean hasNextBatch(String fullPath) throws IOException;
+
+  BatchData nextBatch(String fullPath) throws IOException;

Review comment:
       1. because  queryManager.registerReader(IBatchReader), I want to reuse the registered logic, so I need to implement this interface. 
   2.I  throw UnsupportOperationException in nextBatch().




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r598080013



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -201,6 +212,139 @@ private IReaderByTimestamp getRemoteReaderByTimestamp(
         new RequestTimeOutException("Query by timestamp: " + path + " in " + partitionGroup));
   }
 
+  /**
+   * Create a ManagedSeriesReader that can read the data of "path" with filters in the whole
+   * cluster. The data groups that should be queried will be determined by the timeFilter, then for
+   * each group a series reader will be created, and finally all such readers will be merged into
+   * one.
+   *
+   * @param timeFilter nullable, when null, all data groups will be queried
+   * @param valueFilter nullable
+   */
+  public List<IMultPointReader> getMultSeriesReader(
+      List<PartialPath> paths,
+      Map<String, Set<String>> deviceMeasurements,
+      List<TSDataType> dataTypes,
+      Filter timeFilter,
+      Filter valueFilter,
+      QueryContext context,
+      boolean ascending)
+      throws StorageEngineException, EmptyIntervalException {
+    // find the groups that should be queried using the timeFilter
+
+    Map<PartitionGroup, List<PartialPath>> partitionGroupListMap = Maps.newHashMap();
+    for (PartialPath partialPath : paths) {
+      List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(timeFilter, partialPath);
+      partitionGroups.forEach(
+          partitionGroup -> {
+            partitionGroupListMap
+                .computeIfAbsent(partitionGroup, n -> new ArrayList<>())
+                .add(partialPath);
+          });
+    }
+
+    List<IMultPointReader> multPointReaders = Lists.newArrayList();
+
+    partitionGroupListMap
+        .keySet()
+        .forEach(
+            partitionGroup -> {
+              List<PartialPath> partialPaths = partitionGroupListMap.get(partitionGroup);
+              Map<String, Set<String>> partitionGroupDeviceMeasurements = Maps.newHashMap();
+              List<TSDataType> partitionGroupTSDataType = Lists.newArrayList();
+              partialPaths.forEach(
+                  partialPath -> {
+                    Set<String> measurements =
+                        deviceMeasurements.getOrDefault(
+                            partialPath.getDevice(), Collections.emptySet());
+                    partitionGroupDeviceMeasurements.put(partialPath.getFullPath(), measurements);
+                    partitionGroupTSDataType.add(dataTypes.get(paths.lastIndexOf(partialPath)));
+                  });
+
+              try {
+                IMultPointReader iMultPointReader =
+                    (IMultPointReader)
+                        getMultSeriesReader(
+                            partitionGroup,
+                            partialPaths,
+                            partitionGroupTSDataType,
+                            partitionGroupDeviceMeasurements,
+                            timeFilter,
+                            valueFilter,
+                            context,
+                            ascending);
+                multPointReaders.add(iMultPointReader);
+              } catch (Exception e) {
+

Review comment:
       done

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -201,6 +212,139 @@ private IReaderByTimestamp getRemoteReaderByTimestamp(
         new RequestTimeOutException("Query by timestamp: " + path + " in " + partitionGroup));
   }
 
+  /**
+   * Create a ManagedSeriesReader that can read the data of "path" with filters in the whole
+   * cluster. The data groups that should be queried will be determined by the timeFilter, then for
+   * each group a series reader will be created, and finally all such readers will be merged into
+   * one.
+   *
+   * @param timeFilter nullable, when null, all data groups will be queried
+   * @param valueFilter nullable
+   */
+  public List<IMultPointReader> getMultSeriesReader(

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r601403040



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
##########
@@ -58,6 +64,78 @@
     this.readerFactory = new ClusterReaderFactory(metaGroupMember);
   }
 
+  /**
+   * use mult batch query for without value filter
+   *
+   * @param context query context
+   * @return query data set
+   * @throws StorageEngineException
+   */
+  @Override
+  public QueryDataSet executeWithoutValueFilter(QueryContext context)
+      throws StorageEngineException {
+    try {
+      List<ManagedSeriesReader> readersOfSelectedSeries = initMultSeriesReader(context);
+      return new RawQueryDataSetWithoutValueFilter(
+          context.getQueryId(),
+          queryPlan.getDeduplicatedPaths(),
+          queryPlan.getDeduplicatedDataTypes(),
+          readersOfSelectedSeries,
+          queryPlan.isAscending());
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new StorageEngineException(e.getMessage());
+    } catch (IOException | EmptyIntervalException | QueryProcessException e) {
+      throw new StorageEngineException(e.getMessage());
+    }
+  }
+
+  private List<ManagedSeriesReader> initMultSeriesReader(QueryContext context)
+      throws StorageEngineException, IOException, EmptyIntervalException, QueryProcessException {
+    Filter timeFilter = null;
+    if (queryPlan.getExpression() != null) {
+      timeFilter = ((GlobalTimeExpression) queryPlan.getExpression()).getFilter();
+    }
+
+    // make sure the partition table is new
+    try {
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
+    } catch (CheckConsistencyException e) {
+      throw new StorageEngineException(e);
+    }
+    List<ManagedSeriesReader> readersOfSelectedSeries = Lists.newArrayList();
+    List<AbstractMultPointReader> multPointReaders = Lists.newArrayList();
+
+    multPointReaders =
+        readerFactory.getMultSeriesReader(
+            queryPlan.getDeduplicatedPaths(),
+            queryPlan.getDeviceToMeasurements(),
+            queryPlan.getDeduplicatedDataTypes(),
+            timeFilter,
+            null,
+            context,
+            queryPlan.isAscending());
+
+    // combine reader of different partition group of the same path
+    // into a MultManagedMergeReader
+    for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+      PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i);
+      TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+      AssignPathManagedMergeReader assignPathManagedMergeReader =
+          new AssignPathManagedMergeReader(partialPath.getFullPath(), dataType);
+      for (AbstractMultPointReader multPointReader : multPointReaders) {

Review comment:
       Mult can mean multiple times,
   iPointReader and Multi are easy to confuse,
   So Mult can express abstract classes better here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r598080556



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultPriorityMergeReader.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+
+import java.io.IOException;
+import java.util.Set;
+
+/** This class implements {@link IPointReader} for data sources with different priorities. */
+public class MultPriorityMergeReader extends PriorityMergeReader implements IMultPointReader {
+
+  private String fullPath;
+
+  public MultPriorityMergeReader(String fullPath) {
+    super();
+    this.fullPath = fullPath;
+  }
+
+  public void addReader(IMultPointReader reader, long priority) throws IOException {
+    if (reader.hasNextTimeValuePair(fullPath)) {
+      heap.add(
+          new MultElement(
+              reader, reader.nextTimeValuePair(fullPath), new MergeReaderPriority(priority, 0)));
+    } else {
+      reader.close();
+    }
+  }
+
+  @Override
+  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+    return false;
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
+    return null;
+  }

Review comment:
       I refactor code, MultPriorityMergeReader extends PriorityMergeReader, reuse  PriorityMergeReader class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#issuecomment-805398383


   @mychaow @neuyilan @LebronAl  hi, could you please review?
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#issuecomment-803275582


   > Good job, but there are still some small issues. I think the definition and usage of `IMultReader` are confusing, especially that some classes are defined as `Mult` but only have `Single` behavior.
   
   I refactor code, please review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesRe…

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r598077644



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class MultBatchReader implements IMultBatchReader {
+
+  private Map<String, IBatchReader> pathBatchReaders;
+
+  public MultBatchReader(Map<String, IBatchReader> pathBatchReaders) {
+    this.pathBatchReaders = pathBatchReaders;
+  }
+
+  /**
+   * reader has next batch data
+   *
+   * @return true if only one reader has next batch data, otherwise false
+   * @throws IOException
+   */
+  @Override
+  public boolean hasNextBatch() throws IOException {
+    for (IBatchReader reader : pathBatchReaders.values()) {
+      if (reader.hasNextBatch()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean hasNextBatch(String fullPath) throws IOException {
+    return pathBatchReaders.get(fullPath).hasNextBatch();
+  }
+
+  @Override
+  public BatchData nextBatch(String fullPath) throws IOException {
+    return pathBatchReaders.get(fullPath).nextBatch();
+  }
+
+  @Override
+  public BatchData nextBatch() throws IOException {
+    return null;

Review comment:
       thanks, done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org