You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/03/19 12:05:06 UTC

[GitHub] [iotdb] jt2594838 commented on a change in pull request #2875: [IOTDB-1117][Distributed]Batched creation and fetch of RemoteSeriesReā€¦

jt2594838 commented on a change in pull request #2875:
URL: https://github.com/apache/iotdb/pull/2875#discussion_r597597884



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -201,6 +212,139 @@ private IReaderByTimestamp getRemoteReaderByTimestamp(
         new RequestTimeOutException("Query by timestamp: " + path + " in " + partitionGroup));
   }
 
+  /**
+   * Create a ManagedSeriesReader that can read the data of "path" with filters in the whole
+   * cluster. The data groups that should be queried will be determined by the timeFilter, then for
+   * each group a series reader will be created, and finally all such readers will be merged into
+   * one.
+   *
+   * @param timeFilter nullable, when null, all data groups will be queried
+   * @param valueFilter nullable
+   */
+  public List<IMultPointReader> getMultSeriesReader(

Review comment:
       The comment does not correspond to the method.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -201,6 +212,139 @@ private IReaderByTimestamp getRemoteReaderByTimestamp(
         new RequestTimeOutException("Query by timestamp: " + path + " in " + partitionGroup));
   }
 
+  /**
+   * Create a ManagedSeriesReader that can read the data of "path" with filters in the whole
+   * cluster. The data groups that should be queried will be determined by the timeFilter, then for
+   * each group a series reader will be created, and finally all such readers will be merged into
+   * one.
+   *
+   * @param timeFilter nullable, when null, all data groups will be queried
+   * @param valueFilter nullable
+   */
+  public List<IMultPointReader> getMultSeriesReader(
+      List<PartialPath> paths,
+      Map<String, Set<String>> deviceMeasurements,
+      List<TSDataType> dataTypes,
+      Filter timeFilter,
+      Filter valueFilter,
+      QueryContext context,
+      boolean ascending)
+      throws StorageEngineException, EmptyIntervalException {
+    // find the groups that should be queried using the timeFilter
+
+    Map<PartitionGroup, List<PartialPath>> partitionGroupListMap = Maps.newHashMap();
+    for (PartialPath partialPath : paths) {
+      List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(timeFilter, partialPath);
+      partitionGroups.forEach(
+          partitionGroup -> {
+            partitionGroupListMap
+                .computeIfAbsent(partitionGroup, n -> new ArrayList<>())
+                .add(partialPath);
+          });
+    }
+
+    List<IMultPointReader> multPointReaders = Lists.newArrayList();
+
+    partitionGroupListMap
+        .keySet()
+        .forEach(
+            partitionGroup -> {
+              List<PartialPath> partialPaths = partitionGroupListMap.get(partitionGroup);

Review comment:
       Maybe you can iterate the `entrySet` instead.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultBatchReader.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class MultBatchReader implements IMultBatchReader {
+
+  private Map<String, IBatchReader> pathBatchReaders;
+
+  public MultBatchReader(Map<String, IBatchReader> pathBatchReaders) {
+    this.pathBatchReaders = pathBatchReaders;
+  }
+
+  /**
+   * reader has next batch data
+   *
+   * @return true if only one reader has next batch data, otherwise false
+   * @throws IOException
+   */
+  @Override
+  public boolean hasNextBatch() throws IOException {
+    for (IBatchReader reader : pathBatchReaders.values()) {
+      if (reader.hasNextBatch()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean hasNextBatch(String fullPath) throws IOException {
+    return pathBatchReaders.get(fullPath).hasNextBatch();
+  }
+
+  @Override
+  public BatchData nextBatch(String fullPath) throws IOException {
+    return pathBatchReaders.get(fullPath).nextBatch();
+  }
+
+  @Override
+  public BatchData nextBatch() throws IOException {
+    return null;

Review comment:
       This could be dangerous. I suggest throwing an exception instead.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * RemoteSimpleSeriesReader is a reader without value filter that reads points from a remote side.
+ */
+public class RemoteMultSeriesReader implements IMultPointReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(RemoteMultSeriesReader.class);
+  private static final int FETCH_BATCH_DATA_SIZE = 10;
+
+  private MultDataSourceInfo sourceInfo;
+
+  private Map<String, Queue<BatchData>> cachedBatchs;
+
+  private AtomicReference<Map<String, ByteBuffer>> fetchResult = new AtomicReference<>();
+  private GenericHandler<Map<String, ByteBuffer>> handler;
+
+  private BatchStrategy batchStrategy;
+
+  private Map<String, BatchData> currentBatchDatas;
+
+  public RemoteMultSeriesReader(MultDataSourceInfo sourceInfo) {
+    this.sourceInfo = sourceInfo;
+    this.handler = new GenericHandler<>(sourceInfo.getCurrentNode(), fetchResult);
+    this.cachedBatchs = Maps.newHashMap();
+    this.sourceInfo
+        .getPartialPaths()
+        .forEach(
+            partialPath -> {
+              this.cachedBatchs.put(partialPath.getFullPath(), new ConcurrentLinkedQueue<>());
+            });
+    this.cachedBatchs = Maps.newHashMap();
+    this.batchStrategy = new DefaultBatchStrategy();
+  }
+
+  @Override
+  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+    BatchData batchData = currentBatchDatas.get(fullPath);
+    if (batchData != null && batchData.hasCurrent()) {
+      return true;
+    }
+    fetchBatch();
+    return checkPathBatchData(fullPath);
+  }
+
+  private boolean checkPathBatchData(String fullPath) {
+    BatchData batchData = cachedBatchs.get(fullPath).peek();
+    if (!batchData.isEmpty()) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
+    BatchData batchData = currentBatchDatas.get(fullPath);
+    if ((batchData == null || !batchData.hasCurrent()) && checkPathBatchData(fullPath)) {
+      batchData = cachedBatchs.get(fullPath).poll();
+      currentBatchDatas.put(fullPath, batchData);
+    }
+
+    if (!hasNextTimeValuePair(fullPath)) {
+      throw new NoSuchElementException();
+    }
+
+    TSDataType dataType = null;
+    for (int i = 0; i < sourceInfo.getPartialPaths().size(); i++) {
+      if (fullPath.equals(sourceInfo.getPartialPaths().get(i).getFullPath())) {
+        dataType = sourceInfo.getDataTypes().get(i);
+      }
+    }

Review comment:
       This should be aided with an index. Comparing strings each time could be slow.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
##########
@@ -201,6 +212,139 @@ private IReaderByTimestamp getRemoteReaderByTimestamp(
         new RequestTimeOutException("Query by timestamp: " + path + " in " + partitionGroup));
   }
 
+  /**
+   * Create a ManagedSeriesReader that can read the data of "path" with filters in the whole
+   * cluster. The data groups that should be queried will be determined by the timeFilter, then for
+   * each group a series reader will be created, and finally all such readers will be merged into
+   * one.
+   *
+   * @param timeFilter nullable, when null, all data groups will be queried
+   * @param valueFilter nullable
+   */
+  public List<IMultPointReader> getMultSeriesReader(
+      List<PartialPath> paths,
+      Map<String, Set<String>> deviceMeasurements,
+      List<TSDataType> dataTypes,
+      Filter timeFilter,
+      Filter valueFilter,
+      QueryContext context,
+      boolean ascending)
+      throws StorageEngineException, EmptyIntervalException {
+    // find the groups that should be queried using the timeFilter
+
+    Map<PartitionGroup, List<PartialPath>> partitionGroupListMap = Maps.newHashMap();
+    for (PartialPath partialPath : paths) {
+      List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(timeFilter, partialPath);
+      partitionGroups.forEach(
+          partitionGroup -> {
+            partitionGroupListMap
+                .computeIfAbsent(partitionGroup, n -> new ArrayList<>())
+                .add(partialPath);
+          });
+    }
+
+    List<IMultPointReader> multPointReaders = Lists.newArrayList();
+
+    partitionGroupListMap
+        .keySet()
+        .forEach(
+            partitionGroup -> {
+              List<PartialPath> partialPaths = partitionGroupListMap.get(partitionGroup);
+              Map<String, Set<String>> partitionGroupDeviceMeasurements = Maps.newHashMap();
+              List<TSDataType> partitionGroupTSDataType = Lists.newArrayList();
+              partialPaths.forEach(
+                  partialPath -> {
+                    Set<String> measurements =
+                        deviceMeasurements.getOrDefault(
+                            partialPath.getDevice(), Collections.emptySet());
+                    partitionGroupDeviceMeasurements.put(partialPath.getFullPath(), measurements);
+                    partitionGroupTSDataType.add(dataTypes.get(paths.lastIndexOf(partialPath)));
+                  });
+
+              try {
+                IMultPointReader iMultPointReader =
+                    (IMultPointReader)
+                        getMultSeriesReader(
+                            partitionGroup,
+                            partialPaths,
+                            partitionGroupTSDataType,
+                            partitionGroupDeviceMeasurements,
+                            timeFilter,
+                            valueFilter,
+                            context,
+                            ascending);
+                multPointReaders.add(iMultPointReader);
+              } catch (Exception e) {
+

Review comment:
       Add a log or explain why it can be ignored.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IMultBatchReader.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+
+import java.io.IOException;
+
+public interface IMultBatchReader extends IBatchReader {
+
+  boolean hasNextBatch(String fullPath) throws IOException;
+
+  BatchData nextBatch(String fullPath) throws IOException;

Review comment:
       What is the semantics when interfaces in `IBatchReader` are used by an `IMultBatchReader`?
   For example, what will an `IMultBatchReader` return, if `nextBatch` is called without providing the full path?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultPriorityMergeReader.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+
+import java.io.IOException;
+import java.util.Set;
+
+/** This class implements {@link IPointReader} for data sources with different priorities. */
+public class MultPriorityMergeReader extends PriorityMergeReader implements IMultPointReader {
+
+  private String fullPath;
+
+  public MultPriorityMergeReader(String fullPath) {
+    super();
+    this.fullPath = fullPath;
+  }
+
+  public void addReader(IMultPointReader reader, long priority) throws IOException {
+    if (reader.hasNextTimeValuePair(fullPath)) {
+      heap.add(
+          new MultElement(
+              reader, reader.nextTimeValuePair(fullPath), new MergeReaderPriority(priority, 0)));
+    } else {
+      reader.close();
+    }
+  }
+
+  @Override
+  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+    return false;
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
+    return null;
+  }

Review comment:
       Seeing this, I do not think this class should be an `IMultPointReader`.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultManagedMergeReader.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+public class MultManagedMergeReader extends MultPriorityMergeReader implements ManagedSeriesReader {
+

Review comment:
       Can we find a better name? Actually, this reader only concerns one timeseries so `Mult` in the name is misleading.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * RemoteSimpleSeriesReader is a reader without value filter that reads points from a remote side.
+ */
+public class RemoteMultSeriesReader implements IMultPointReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(RemoteMultSeriesReader.class);
+  private static final int FETCH_BATCH_DATA_SIZE = 10;
+
+  private MultDataSourceInfo sourceInfo;
+
+  private Map<String, Queue<BatchData>> cachedBatchs;
+
+  private AtomicReference<Map<String, ByteBuffer>> fetchResult = new AtomicReference<>();
+  private GenericHandler<Map<String, ByteBuffer>> handler;
+
+  private BatchStrategy batchStrategy;
+
+  private Map<String, BatchData> currentBatchDatas;
+
+  public RemoteMultSeriesReader(MultDataSourceInfo sourceInfo) {
+    this.sourceInfo = sourceInfo;
+    this.handler = new GenericHandler<>(sourceInfo.getCurrentNode(), fetchResult);
+    this.cachedBatchs = Maps.newHashMap();
+    this.sourceInfo
+        .getPartialPaths()
+        .forEach(
+            partialPath -> {
+              this.cachedBatchs.put(partialPath.getFullPath(), new ConcurrentLinkedQueue<>());
+            });
+    this.cachedBatchs = Maps.newHashMap();

Review comment:
       The second ` this.cachedBatchs = Maps.newHashMap();` should be  `this.currentBatchDatas = Maps.newHashMap()`;.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org