You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/03/29 03:10:55 UTC

[GitHub] [iotdb] cornmonster opened a new pull request #5367: [IOTDB-2727] data block manager impl

cornmonster opened a new pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367


   Design: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199538158


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840213064



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();
+
+  private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+  private NewDataBlockEvent savedNewDataBlockEvent;
+  private int numOfBufferedTsBlocks = 0;
+  private long bufferRetainedSizeInBytes;
+  private boolean closed;
+  private boolean noMoreTsBlocks;
+  private Throwable throwable;
+
+  public SinkHandle(
+      String remoteHostname,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remoteOperatorId,
+      TFragmentInstanceId localFragmentInstanceId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      DataBlockService.Client client,
+      TsBlockSerde serde,
+      SinkHandleListener sinkHandleListener) {
+    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.remoteOperatorId = Validate.notNull(remoteOperatorId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.client = Validate.notNull(client);
+    this.serde = Validate.notNull(serde);
+    this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+  }
+
+  @Override
+  public ListenableFuture<Void> isFull() {
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    return nonCancellationPropagating(blocked);
+  }
+
+  private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
+    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+  }
+
+  @Override
+  public void send(List<TsBlock> tsBlocks) throws IOException {
+    Validate.notNull(tsBlocks, "tsBlocks is null");
+    if (throwable != null) {
+      throw new IOException(throwable);
+    }
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Sink handle is blocked.");
+    }
+    if (noMoreTsBlocks) {
+      return;
+    }
+
+    long retainedSizeInBytes = 0L;
+    for (TsBlock tsBlock : tsBlocks) {
+      retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    }
+    int currentEndSequenceId;
+    List<Long> tsBlockSizes = new ArrayList<>();
+    synchronized (this) {
+      currentEndSequenceId = bufferedTsBlocks.size();
+      blocked =
+          localMemoryManager
+              .getQueryPool()
+              .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
+      bufferRetainedSizeInBytes += retainedSizeInBytes;
+      bufferedTsBlocks.addAll(tsBlocks);
+      numOfBufferedTsBlocks += tsBlocks.size();
+      for (int i = currentEndSequenceId; i < currentEndSequenceId + tsBlocks.size(); i++) {
+        tsBlockSizes.add(bufferedTsBlocks.get(i).getRetainedSizeInBytes());
+      }
+    }
+
+    submitSendNewDataBlockEventTask(currentEndSequenceId, tsBlockSizes);

Review comment:
       It is a dilemma of network efficiency or query latency. The current implementation trades the network for better latency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840202525



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
##########
@@ -19,98 +19,296 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class DataBlockManager {
-
-  public static class FragmentInstanceInfo {
-    private String hostname;
-    private String queryId;
-    private String fragmentId;
-    private String instanceId;
-
-    public FragmentInstanceInfo(
-        String hostname, String queryId, String fragmentId, String instanceId) {
-      this.hostname = Validate.notNull(hostname);
-      this.queryId = Validate.notNull(queryId);
-      this.fragmentId = Validate.notNull(fragmentId);
-      this.instanceId = Validate.notNull(instanceId);
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+public class DataBlockManager implements IDataBlockManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(DataBlockManager.class);
+
+  public interface SourceHandleListener {
+    void onFinished(SourceHandle sourceHandle);
+
+    void onClosed(SourceHandle sourceHandle);
+  }
+
+  public interface SinkHandleListener {
+    void onFinish(SinkHandle sinkHandle);
+
+    void onClosed(SinkHandle sinkHandle);
+
+    void onAborted(SinkHandle sinkHandle);
+  }
+
+  /** Handle thrift communications. */
+  class DataBlockServiceImpl implements DataBlockService.Iface {
+
+    public GetDataBlockResponse getDataBlock(GetDataBlockRequest req) throws TException {
+      logger.debug(
+          "Get data block request received. Asking for data block [{}, {}) from {}.",
+          req.getStartSequenceId(),
+          req.getEndSequenceId(),
+          req.getSourceFragnemtInstanceId());
+      if (!sinkHandles.containsKey(req.getSourceFragnemtInstanceId())) {
+        throw new TException(
+            "Source fragment instance not found. Fragment instance ID: "
+                + req.getSourceFragnemtInstanceId()
+                + ".");
+      }
+      GetDataBlockResponse resp = new GetDataBlockResponse();
+      for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
+        ByteBuffer serializedTsBlock =
+            sinkHandles
+                .get(req.getSourceFragnemtInstanceId())
+                .getSerializedTsBlock(req.getStartSequenceId());
+        resp.addToTsBlocks(serializedTsBlock);
+      }
+      return resp;
     }
 
-    public String getHostname() {
-      return hostname;
+    @Override
+    public void onNewDataBlockEvent(NewDataBlockEvent e) throws TException {
+      logger.debug(
+          "New data block event received, for operator {} of {} from {}.",
+          e.getTargetOperatorId(),
+          e.getTargetFragmentInstanceId(),
+          e.getSourceFragnemtInstanceId());
+      if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
+          || !sourceHandles
+              .get(e.getTargetFragmentInstanceId())
+              .containsKey(e.getTargetOperatorId())
+          || sourceHandles
+              .get(e.getTargetFragmentInstanceId())
+              .get(e.getTargetOperatorId())
+              .isClosed()) {
+        logger.info("{} is ignored because the source handle does not exist or has been closed", e);
+        return;

Review comment:
       Good catch. I forget to change this after I changed the transmission protocol.
   
   > If the NewDataBlockEvent is omitted because the corresponding SourceHandle is not created yet, the SourceHandle will keep the blocked status after being creating in the future until next event comes.
   
   
   True. Actually, it is not likely to happen since the series scan operator takes some time to produce data for downstream instances, I believe instances should be available then.
   
   > And if the SinkHandle cannot send the next event due to memory limit or there is no more TsBlocks to be sent, the SourceHandle may be blocked permanently...
   
   
   The SinkHandle will attempt to send the events at most 3 times. If it doesn't work, the SinkHandle throws an 
   I/O exception on next invocation of any method, telling the caller that there is something wrong.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840202525



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
##########
@@ -19,98 +19,296 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class DataBlockManager {
-
-  public static class FragmentInstanceInfo {
-    private String hostname;
-    private String queryId;
-    private String fragmentId;
-    private String instanceId;
-
-    public FragmentInstanceInfo(
-        String hostname, String queryId, String fragmentId, String instanceId) {
-      this.hostname = Validate.notNull(hostname);
-      this.queryId = Validate.notNull(queryId);
-      this.fragmentId = Validate.notNull(fragmentId);
-      this.instanceId = Validate.notNull(instanceId);
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+public class DataBlockManager implements IDataBlockManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(DataBlockManager.class);
+
+  public interface SourceHandleListener {
+    void onFinished(SourceHandle sourceHandle);
+
+    void onClosed(SourceHandle sourceHandle);
+  }
+
+  public interface SinkHandleListener {
+    void onFinish(SinkHandle sinkHandle);
+
+    void onClosed(SinkHandle sinkHandle);
+
+    void onAborted(SinkHandle sinkHandle);
+  }
+
+  /** Handle thrift communications. */
+  class DataBlockServiceImpl implements DataBlockService.Iface {
+
+    public GetDataBlockResponse getDataBlock(GetDataBlockRequest req) throws TException {
+      logger.debug(
+          "Get data block request received. Asking for data block [{}, {}) from {}.",
+          req.getStartSequenceId(),
+          req.getEndSequenceId(),
+          req.getSourceFragnemtInstanceId());
+      if (!sinkHandles.containsKey(req.getSourceFragnemtInstanceId())) {
+        throw new TException(
+            "Source fragment instance not found. Fragment instance ID: "
+                + req.getSourceFragnemtInstanceId()
+                + ".");
+      }
+      GetDataBlockResponse resp = new GetDataBlockResponse();
+      for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
+        ByteBuffer serializedTsBlock =
+            sinkHandles
+                .get(req.getSourceFragnemtInstanceId())
+                .getSerializedTsBlock(req.getStartSequenceId());
+        resp.addToTsBlocks(serializedTsBlock);
+      }
+      return resp;
     }
 
-    public String getHostname() {
-      return hostname;
+    @Override
+    public void onNewDataBlockEvent(NewDataBlockEvent e) throws TException {
+      logger.debug(
+          "New data block event received, for operator {} of {} from {}.",
+          e.getTargetOperatorId(),
+          e.getTargetFragmentInstanceId(),
+          e.getSourceFragnemtInstanceId());
+      if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
+          || !sourceHandles
+              .get(e.getTargetFragmentInstanceId())
+              .containsKey(e.getTargetOperatorId())
+          || sourceHandles
+              .get(e.getTargetFragmentInstanceId())
+              .get(e.getTargetOperatorId())
+              .isClosed()) {
+        logger.info("{} is ignored because the source handle does not exist or has been closed", e);
+        return;

Review comment:
       Good catch. I forget to change this after I changed the transmission protocol.
   
   > If the NewDataBlockEvent is omitted because the corresponding SourceHandle is not created yet, the SourceHandle will keep the blocked status after being creating in the future until next event comes.
   True. Actually, it is not likely to happen since the series scan operator takes some time to produce data for downstream instances, I believe instances should be available then.
   
   > And if the SinkHandle cannot send the next event due to memory limit or there is no more TsBlocks to be sent, the SourceHandle may be blocked permanently...
   The SinkHandle will attempt to send the events at most 3 times. If it doesn't work, the SinkHandle throws an 
   I/O exception on next invocation of any method, telling the caller that there is something wrong.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840196827



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
##########
@@ -79,11 +141,35 @@ public synchronized void free(String queryId, long bytes) {
     } else {
       queryMemoryReservations.put(queryId, queryReservedBytes);
     }
-
     reservedBytes -= bytes;
+
+    if (memoryReservationFutures.isEmpty()) {
+      return;
+    }
+    Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
+    while (iterator.hasNext()) {
+      MemoryReservationFuture<Void> future = iterator.next();
+
+      if (future.isCancelled()) {
+        iterator.remove();
+        continue;
+      }
+
+      long bytesToReserve = future.getBytes();
+      if (maxBytes - reservedBytes < bytesToReserve
+          || maxBytesPerQuery - queryMemoryReservations.getOrDefault(queryId, 0L)

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840260008



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();

Review comment:
       Agree. LinkedList was for the POC. I will replace it with some other data structure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] xingtanzjr commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r839583583



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();
+
+  private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+  private NewDataBlockEvent savedNewDataBlockEvent;
+  private int numOfBufferedTsBlocks = 0;
+  private long bufferRetainedSizeInBytes;
+  private boolean closed;
+  private boolean noMoreTsBlocks;
+  private Throwable throwable;
+
+  public SinkHandle(
+      String remoteHostname,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remoteOperatorId,
+      TFragmentInstanceId localFragmentInstanceId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      DataBlockService.Client client,
+      TsBlockSerde serde,
+      SinkHandleListener sinkHandleListener) {
+    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.remoteOperatorId = Validate.notNull(remoteOperatorId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.client = Validate.notNull(client);
+    this.serde = Validate.notNull(serde);
+    this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+  }
+
+  @Override
+  public ListenableFuture<Void> isFull() {
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    return nonCancellationPropagating(blocked);
+  }
+
+  private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
+    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+  }
+
+  @Override
+  public void send(List<TsBlock> tsBlocks) throws IOException {
+    Validate.notNull(tsBlocks, "tsBlocks is null");
+    if (throwable != null) {
+      throw new IOException(throwable);
+    }
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Sink handle is blocked.");
+    }
+    if (noMoreTsBlocks) {
+      return;
+    }
+
+    long retainedSizeInBytes = 0L;
+    for (TsBlock tsBlock : tsBlocks) {
+      retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    }
+    int currentEndSequenceId;
+    List<Long> tsBlockSizes = new ArrayList<>();
+    synchronized (this) {
+      currentEndSequenceId = bufferedTsBlocks.size();
+      blocked =
+          localMemoryManager
+              .getQueryPool()
+              .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);

Review comment:
       Although the send may trigger `blocked`, current send will still be executed. Is it by design ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] JackieTien97 commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840256322



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
##########
@@ -38,8 +50,25 @@ public ThriftService getImplementation() {
   @Override
   public void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-    impl = new DataBlockServiceImpl();
-    processor = new Processor<>(impl);
+    executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPoolWithDaemon("data-block-manager-task-executors");

Review comment:
       you can set both `corePoolSize` and `maximumPoolSize` to 4 and to make it configurable, you can define it in the `iotdb-engine.properties`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] xingtanzjr commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r839576245



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
##########
@@ -79,11 +141,35 @@ public synchronized void free(String queryId, long bytes) {
     } else {
       queryMemoryReservations.put(queryId, queryReservedBytes);
     }
-
     reservedBytes -= bytes;
+
+    if (memoryReservationFutures.isEmpty()) {
+      return;
+    }
+    Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
+    while (iterator.hasNext()) {
+      MemoryReservationFuture<Void> future = iterator.next();
+
+      if (future.isCancelled()) {
+        iterator.remove();
+        continue;
+      }
+
+      long bytesToReserve = future.getBytes();
+      if (maxBytes - reservedBytes < bytesToReserve
+          || maxBytesPerQuery - queryMemoryReservations.getOrDefault(queryId, 0L)

Review comment:
       It seems that `MemoryReservationFuture` doesn't know which query it belongs to. In other words, the `future` and the `queryId` may represent different query. So we cannot ensure the queryMemoryReservations of the `future` does not exceed the `maxBytesPerQuery `




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840175903



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
##########
@@ -79,11 +141,35 @@ public synchronized void free(String queryId, long bytes) {
     } else {
       queryMemoryReservations.put(queryId, queryReservedBytes);
     }
-
     reservedBytes -= bytes;
+
+    if (memoryReservationFutures.isEmpty()) {
+      return;
+    }
+    Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
+    while (iterator.hasNext()) {
+      MemoryReservationFuture<Void> future = iterator.next();
+
+      if (future.isCancelled()) {
+        iterator.remove();
+        continue;
+      }
+
+      long bytesToReserve = future.getBytes();
+      if (maxBytes - reservedBytes < bytesToReserve
+          || maxBytesPerQuery - queryMemoryReservations.getOrDefault(queryId, 0L)

Review comment:
       Make sense. Will fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840208602



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
##########
@@ -19,72 +19,286 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 
 public class SourceHandle implements ISourceHandle {
 
-  private final long bufferCapacityInBytes;
+  private static final Logger logger = LoggerFactory.getLogger(SourceHandle.class);
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final String localOperatorId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SourceHandleListener sourceHandleListener;
 
   private final Queue<TsBlock> bufferedTsBlocks = new ArrayDeque<>();
+  private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
+
   private volatile SettableFuture<Void> blocked = SettableFuture.create();
-  private volatile long bufferRetainedSizeInBytes;
-  private boolean finished;
+  private volatile Future<?> getDataBlocksTaskFuture = null;
+  private long bufferRetainedSizeInBytes;
+  private int nextSequenceId;
+  private int lastSequenceId = Integer.MAX_VALUE;
+  private boolean noMoreTsBlocks;
   private boolean closed;
   private Throwable throwable;
 
-  public SourceHandle(long bufferCapacityInBytes) {
-    Validate.isTrue(bufferCapacityInBytes > 0L, "capacity cannot be less or equal to zero.");
-    this.bufferCapacityInBytes = bufferCapacityInBytes;
+  public SourceHandle(
+      String remoteHostname,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      TFragmentInstanceId localFragmentInstanceId,
+      String localOperatorId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      DataBlockService.Client client,
+      TsBlockSerde serde,
+      SourceHandleListener sourceHandleListener) {
+    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localOperatorId = Validate.notNull(localOperatorId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.client = Validate.notNull(client);
+    this.serde = Validate.notNull(serde);
+    this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+    bufferRetainedSizeInBytes = 0L;
   }
 
   @Override
-  public TsBlock receive() {
+  public synchronized TsBlock receive() throws IOException {
     if (throwable != null) {
-      throw new RuntimeException(throwable);
+      throw new IOException(throwable);
     }
     if (closed) {
-      throw new IllegalStateException("Source handle has been closed.");
+      throw new IllegalStateException("Source handle is closed.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Source handle is blocked.");
     }
     TsBlock tsBlock = bufferedTsBlocks.poll();
     if (tsBlock != null) {
-      bufferRetainedSizeInBytes -= getRetainedSizeInBytes(tsBlock);
+      bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
     }
-    if (bufferedTsBlocks.isEmpty() && !finished && blocked.isDone()) {
+    if (bufferedTsBlocks.isEmpty() && !isFinished() && blocked.isDone()) {
       blocked = SettableFuture.create();
     }
+    if (isFinished()) {
+      sourceHandleListener.onFinished(this);
+    }
+    trySubmitGetDataBlocksTask();
     return tsBlock;
   }
 
-  private long getRetainedSizeInBytes(TsBlock tsBlock) {
-    throw new UnsupportedOperationException();
+  private void trySubmitGetDataBlocksTask() {
+    if (getDataBlocksTaskFuture != null && !getDataBlocksTaskFuture.isDone()) {
+      return;
+    }
+    int startSequenceId = nextSequenceId;
+    int currSequenceId = nextSequenceId;
+    long reservedBytes = 0L;
+    while (sequenceIdToDataBlockSize.containsKey(currSequenceId)) {
+      Long bytesToReserve = sequenceIdToDataBlockSize.get(currSequenceId);
+      if (bytesToReserve == null) {
+        throw new IllegalStateException("Data block size is null.");
+      }
+      boolean reserved =
+          localMemoryManager
+              .getQueryPool()
+              .tryReserve(localFragmentInstanceId.getQueryId(), bytesToReserve);
+      if (reserved) {
+        currSequenceId += 1;
+        reservedBytes += bytesToReserve;
+        bufferRetainedSizeInBytes += bytesToReserve;
+      } else {
+        break;
+      }
+    }
+
+    if (currSequenceId > startSequenceId) {

Review comment:
       trySubmitGetDataBlocksTask will be called in 2 cases:
   
   1. new data blocks generated by upstream instance -> see SourceHandle#updatePendingDataBlockInfo
   2. memory released -> see SourceHandle#receive




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] xingtanzjr commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840192120



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
##########
@@ -19,72 +19,286 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 
 public class SourceHandle implements ISourceHandle {
 
-  private final long bufferCapacityInBytes;
+  private static final Logger logger = LoggerFactory.getLogger(SourceHandle.class);
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final String localOperatorId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SourceHandleListener sourceHandleListener;
 
   private final Queue<TsBlock> bufferedTsBlocks = new ArrayDeque<>();
+  private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
+
   private volatile SettableFuture<Void> blocked = SettableFuture.create();
-  private volatile long bufferRetainedSizeInBytes;
-  private boolean finished;
+  private volatile Future<?> getDataBlocksTaskFuture = null;
+  private long bufferRetainedSizeInBytes;
+  private int nextSequenceId;
+  private int lastSequenceId = Integer.MAX_VALUE;
+  private boolean noMoreTsBlocks;
   private boolean closed;
   private Throwable throwable;
 
-  public SourceHandle(long bufferCapacityInBytes) {
-    Validate.isTrue(bufferCapacityInBytes > 0L, "capacity cannot be less or equal to zero.");
-    this.bufferCapacityInBytes = bufferCapacityInBytes;
+  public SourceHandle(
+      String remoteHostname,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      TFragmentInstanceId localFragmentInstanceId,
+      String localOperatorId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      DataBlockService.Client client,
+      TsBlockSerde serde,
+      SourceHandleListener sourceHandleListener) {
+    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localOperatorId = Validate.notNull(localOperatorId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.client = Validate.notNull(client);
+    this.serde = Validate.notNull(serde);
+    this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+    bufferRetainedSizeInBytes = 0L;
   }
 
   @Override
-  public TsBlock receive() {
+  public synchronized TsBlock receive() throws IOException {
     if (throwable != null) {
-      throw new RuntimeException(throwable);
+      throw new IOException(throwable);
     }
     if (closed) {
-      throw new IllegalStateException("Source handle has been closed.");
+      throw new IllegalStateException("Source handle is closed.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Source handle is blocked.");
     }
     TsBlock tsBlock = bufferedTsBlocks.poll();
     if (tsBlock != null) {
-      bufferRetainedSizeInBytes -= getRetainedSizeInBytes(tsBlock);
+      bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
     }
-    if (bufferedTsBlocks.isEmpty() && !finished && blocked.isDone()) {
+    if (bufferedTsBlocks.isEmpty() && !isFinished() && blocked.isDone()) {

Review comment:
       It seems that the `blocked.isDone()` is always `true` here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] xingtanzjr commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840199520



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();

Review comment:
       Furthermore, the `bufferedTsBlocks`'s size will continue to grow with the sequenceId...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840204718



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
##########
@@ -38,8 +50,25 @@ public ThriftService getImplementation() {
   @Override
   public void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-    impl = new DataBlockServiceImpl();
-    processor = new Processor<>(impl);
+    executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPoolWithDaemon("data-block-manager-task-executors");
+    clientFactory = new DataBlockServiceClientFactory();
+    this.dataBlockManager =
+        new DataBlockManager(
+            localMemoryManager, tsBlockSerdeFactory, executorService, clientFactory);
+    processor = new Processor<>(dataBlockManager.getOrCreateDataBlockServiceImpl());

Review comment:
       getOrCreateDataBlockServiceImpl indicates the DataBlockServiceImpl will be reused internally, while createDataBlockServiceImpl indicates there will be a new DataBlockServiceImpl created on each invocation.
   
   I believe getOrCreateDataBlockServiceImpl describes the behavior of DataBlockManager, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] xingtanzjr commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840196186



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
##########
@@ -19,98 +19,296 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class DataBlockManager {
-
-  public static class FragmentInstanceInfo {
-    private String hostname;
-    private String queryId;
-    private String fragmentId;
-    private String instanceId;
-
-    public FragmentInstanceInfo(
-        String hostname, String queryId, String fragmentId, String instanceId) {
-      this.hostname = Validate.notNull(hostname);
-      this.queryId = Validate.notNull(queryId);
-      this.fragmentId = Validate.notNull(fragmentId);
-      this.instanceId = Validate.notNull(instanceId);
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+public class DataBlockManager implements IDataBlockManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(DataBlockManager.class);
+
+  public interface SourceHandleListener {
+    void onFinished(SourceHandle sourceHandle);
+
+    void onClosed(SourceHandle sourceHandle);
+  }
+
+  public interface SinkHandleListener {
+    void onFinish(SinkHandle sinkHandle);
+
+    void onClosed(SinkHandle sinkHandle);
+
+    void onAborted(SinkHandle sinkHandle);
+  }
+
+  /** Handle thrift communications. */
+  class DataBlockServiceImpl implements DataBlockService.Iface {
+
+    public GetDataBlockResponse getDataBlock(GetDataBlockRequest req) throws TException {
+      logger.debug(
+          "Get data block request received. Asking for data block [{}, {}) from {}.",
+          req.getStartSequenceId(),
+          req.getEndSequenceId(),
+          req.getSourceFragnemtInstanceId());
+      if (!sinkHandles.containsKey(req.getSourceFragnemtInstanceId())) {
+        throw new TException(
+            "Source fragment instance not found. Fragment instance ID: "
+                + req.getSourceFragnemtInstanceId()
+                + ".");
+      }
+      GetDataBlockResponse resp = new GetDataBlockResponse();
+      for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
+        ByteBuffer serializedTsBlock =
+            sinkHandles
+                .get(req.getSourceFragnemtInstanceId())
+                .getSerializedTsBlock(req.getStartSequenceId());
+        resp.addToTsBlocks(serializedTsBlock);
+      }
+      return resp;
     }
 
-    public String getHostname() {
-      return hostname;
+    @Override
+    public void onNewDataBlockEvent(NewDataBlockEvent e) throws TException {
+      logger.debug(
+          "New data block event received, for operator {} of {} from {}.",
+          e.getTargetOperatorId(),
+          e.getTargetFragmentInstanceId(),
+          e.getSourceFragnemtInstanceId());
+      if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
+          || !sourceHandles
+              .get(e.getTargetFragmentInstanceId())
+              .containsKey(e.getTargetOperatorId())
+          || sourceHandles
+              .get(e.getTargetFragmentInstanceId())
+              .get(e.getTargetOperatorId())
+              .isClosed()) {
+        logger.info("{} is ignored because the source handle does not exist or has been closed", e);
+        return;

Review comment:
       If the NewDataBlockEvent is omitted because the corresponding SourceHandle is not created yet, the SourceHandle will keep the `blocked` status after being creating in the future until next event comes.
   
   And if the SinkHandle cannot send the next event due to memory limit or there is no more TsBlocks to be sent, the SourceHandle may be blocked permanently...
   
   Maybe we can create the SourceHandle and put it in the map if it hasn't be created when receiving the NewDataBlockEvent




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840261076



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();
+
+  private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+  private NewDataBlockEvent savedNewDataBlockEvent;
+  private int numOfBufferedTsBlocks = 0;
+  private long bufferRetainedSizeInBytes;
+  private boolean closed;
+  private boolean noMoreTsBlocks;
+  private Throwable throwable;
+
+  public SinkHandle(
+      String remoteHostname,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remoteOperatorId,
+      TFragmentInstanceId localFragmentInstanceId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      DataBlockService.Client client,
+      TsBlockSerde serde,
+      SinkHandleListener sinkHandleListener) {
+    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.remoteOperatorId = Validate.notNull(remoteOperatorId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.client = Validate.notNull(client);
+    this.serde = Validate.notNull(serde);
+    this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+  }
+
+  @Override
+  public ListenableFuture<Void> isFull() {
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    return nonCancellationPropagating(blocked);
+  }
+
+  private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
+    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+  }
+
+  @Override
+  public void send(List<TsBlock> tsBlocks) throws IOException {
+    Validate.notNull(tsBlocks, "tsBlocks is null");
+    if (throwable != null) {
+      throw new IOException(throwable);
+    }
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Sink handle is blocked.");
+    }
+    if (noMoreTsBlocks) {
+      return;
+    }
+
+    long retainedSizeInBytes = 0L;
+    for (TsBlock tsBlock : tsBlocks) {
+      retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    }
+    int currentEndSequenceId;
+    List<Long> tsBlockSizes = new ArrayList<>();
+    synchronized (this) {
+      currentEndSequenceId = bufferedTsBlocks.size();
+      blocked =
+          localMemoryManager
+              .getQueryPool()
+              .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
+      bufferRetainedSizeInBytes += retainedSizeInBytes;
+      bufferedTsBlocks.addAll(tsBlocks);
+      numOfBufferedTsBlocks += tsBlocks.size();
+      for (int i = currentEndSequenceId; i < currentEndSequenceId + tsBlocks.size(); i++) {
+        tsBlockSizes.add(bufferedTsBlocks.get(i).getRetainedSizeInBytes());
+      }
+    }
+
+    submitSendNewDataBlockEventTask(currentEndSequenceId, tsBlockSizes);

Review comment:
       Also, the size of a TsBlock should be limited by an upper bound, like 1MB. But I believe the sink operator may produce more than that at each time. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] JackieTien97 commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840160562



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
##########
@@ -19,98 +19,296 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class DataBlockManager {
-
-  public static class FragmentInstanceInfo {
-    private String hostname;
-    private String queryId;
-    private String fragmentId;
-    private String instanceId;
-
-    public FragmentInstanceInfo(
-        String hostname, String queryId, String fragmentId, String instanceId) {
-      this.hostname = Validate.notNull(hostname);
-      this.queryId = Validate.notNull(queryId);
-      this.fragmentId = Validate.notNull(fragmentId);
-      this.instanceId = Validate.notNull(instanceId);
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+public class DataBlockManager implements IDataBlockManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(DataBlockManager.class);
+
+  public interface SourceHandleListener {
+    void onFinished(SourceHandle sourceHandle);
+
+    void onClosed(SourceHandle sourceHandle);
+  }
+
+  public interface SinkHandleListener {
+    void onFinish(SinkHandle sinkHandle);
+
+    void onClosed(SinkHandle sinkHandle);
+
+    void onAborted(SinkHandle sinkHandle);
+  }
+
+  /** Handle thrift communications. */
+  class DataBlockServiceImpl implements DataBlockService.Iface {
+
+    public GetDataBlockResponse getDataBlock(GetDataBlockRequest req) throws TException {
+      logger.debug(
+          "Get data block request received. Asking for data block [{}, {}) from {}.",
+          req.getStartSequenceId(),
+          req.getEndSequenceId(),
+          req.getSourceFragnemtInstanceId());
+      if (!sinkHandles.containsKey(req.getSourceFragnemtInstanceId())) {
+        throw new TException(
+            "Source fragment instance not found. Fragment instance ID: "
+                + req.getSourceFragnemtInstanceId()
+                + ".");
+      }
+      GetDataBlockResponse resp = new GetDataBlockResponse();
+      for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
+        ByteBuffer serializedTsBlock =
+            sinkHandles
+                .get(req.getSourceFragnemtInstanceId())
+                .getSerializedTsBlock(req.getStartSequenceId());

Review comment:
       ```suggestion
                   .getSerializedTsBlock(i);
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();

Review comment:
       I think it will be better use `TsBlockWrapper` which contains `TsBlock` and `long` indicating its sequence and then each time the sourceHandle call `getDataBlock`, we can iterate from the beginning of this linkedList and remove the `TsBlockWrapper` whose `sequenceId` is before `startSequenceId` of this request.

##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
##########
@@ -19,98 +19,296 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class DataBlockManager {
-
-  public static class FragmentInstanceInfo {
-    private String hostname;
-    private String queryId;
-    private String fragmentId;
-    private String instanceId;
-
-    public FragmentInstanceInfo(
-        String hostname, String queryId, String fragmentId, String instanceId) {
-      this.hostname = Validate.notNull(hostname);
-      this.queryId = Validate.notNull(queryId);
-      this.fragmentId = Validate.notNull(fragmentId);
-      this.instanceId = Validate.notNull(instanceId);
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+public class DataBlockManager implements IDataBlockManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(DataBlockManager.class);
+
+  public interface SourceHandleListener {
+    void onFinished(SourceHandle sourceHandle);
+
+    void onClosed(SourceHandle sourceHandle);
+  }
+
+  public interface SinkHandleListener {
+    void onFinish(SinkHandle sinkHandle);
+
+    void onClosed(SinkHandle sinkHandle);
+
+    void onAborted(SinkHandle sinkHandle);
+  }
+
+  /** Handle thrift communications. */
+  class DataBlockServiceImpl implements DataBlockService.Iface {
+
+    public GetDataBlockResponse getDataBlock(GetDataBlockRequest req) throws TException {

Review comment:
       ```suggestion
       @Override
       public GetDataBlockResponse getDataBlock(GetDataBlockRequest req) throws TException {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
##########
@@ -38,8 +50,25 @@ public ThriftService getImplementation() {
   @Override
   public void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-    impl = new DataBlockServiceImpl();
-    processor = new Processor<>(impl);
+    executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPoolWithDaemon("data-block-manager-task-executors");
+    clientFactory = new DataBlockServiceClientFactory();
+    this.dataBlockManager =
+        new DataBlockManager(
+            localMemoryManager, tsBlockSerdeFactory, executorService, clientFactory);
+    processor = new Processor<>(dataBlockManager.getOrCreateDataBlockServiceImpl());

Review comment:
       Why here the method is named as `getOrCreateDataBlockServiceImpl`? I think DataBlockServiceImpl will only be created once here, being named as `createDataBlockServiceImpl` will be better.

##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/memory/LocalMemoryManager.java
##########
@@ -32,8 +32,8 @@ public LocalMemoryManager() {
     long maxMemory = Runtime.getRuntime().maxMemory();
     // Save 20% memory for untracked allocations.
     maxBytes = (long) (maxMemory * 0.8);
-    // Allocate 50% memory for query execution.
-    queryPool = new MemoryPool("query", (long) (maxBytes * 0.5));
+    // Allocate 50% memory for query execution. Each query can allocate up to 30% memory.
+    queryPool = new MemoryPool("query", (long) (maxBytes * 0.5), (long) (maxMemory * 0.3));

Review comment:
       You can get the query memory limit from `IoTDBConfig.allocateMemoryForRead`

##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
##########
@@ -19,98 +19,296 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class DataBlockManager {
-
-  public static class FragmentInstanceInfo {
-    private String hostname;
-    private String queryId;
-    private String fragmentId;
-    private String instanceId;
-
-    public FragmentInstanceInfo(
-        String hostname, String queryId, String fragmentId, String instanceId) {
-      this.hostname = Validate.notNull(hostname);
-      this.queryId = Validate.notNull(queryId);
-      this.fragmentId = Validate.notNull(fragmentId);
-      this.instanceId = Validate.notNull(instanceId);
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+public class DataBlockManager implements IDataBlockManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(DataBlockManager.class);
+
+  public interface SourceHandleListener {
+    void onFinished(SourceHandle sourceHandle);
+
+    void onClosed(SourceHandle sourceHandle);
+  }
+
+  public interface SinkHandleListener {
+    void onFinish(SinkHandle sinkHandle);
+
+    void onClosed(SinkHandle sinkHandle);
+
+    void onAborted(SinkHandle sinkHandle);
+  }
+
+  /** Handle thrift communications. */
+  class DataBlockServiceImpl implements DataBlockService.Iface {
+
+    public GetDataBlockResponse getDataBlock(GetDataBlockRequest req) throws TException {
+      logger.debug(
+          "Get data block request received. Asking for data block [{}, {}) from {}.",
+          req.getStartSequenceId(),
+          req.getEndSequenceId(),
+          req.getSourceFragnemtInstanceId());
+      if (!sinkHandles.containsKey(req.getSourceFragnemtInstanceId())) {
+        throw new TException(
+            "Source fragment instance not found. Fragment instance ID: "
+                + req.getSourceFragnemtInstanceId()
+                + ".");
+      }
+      GetDataBlockResponse resp = new GetDataBlockResponse();
+      for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
+        ByteBuffer serializedTsBlock =
+            sinkHandles
+                .get(req.getSourceFragnemtInstanceId())

Review comment:
       ```suggestion
           SinkHandle sinkHandle = sinkHandles
               .get(req.getSourceFragnemtInstanceId());
   ```
   Put this line out of the for loop, no need to get from `map` repetitively.

##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();

Review comment:
       In this way, you can avoid random index access of `LinkedList` which is not very efficient. And also, avoid just setting null for consumed TsBlock.

##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
##########
@@ -38,8 +50,25 @@ public ThriftService getImplementation() {
   @Override
   public void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-    impl = new DataBlockServiceImpl();
-    processor = new Processor<>(impl);
+    executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPoolWithDaemon("data-block-manager-task-executors");

Review comment:
       `newCachedThreadPoolWithDaemon`'s `corePoolSize` is `0` and `maximumPoolSize` is `Integer.MAX_VALUE`. It's better to use FixedCountThreadPool or set the `corePoolSize` and `maximumPoolSize` in `CachedThreadPool` to a reasonable value.

##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();
+
+  private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+  private NewDataBlockEvent savedNewDataBlockEvent;
+  private int numOfBufferedTsBlocks = 0;
+  private long bufferRetainedSizeInBytes;
+  private boolean closed;
+  private boolean noMoreTsBlocks;
+  private Throwable throwable;
+
+  public SinkHandle(
+      String remoteHostname,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remoteOperatorId,
+      TFragmentInstanceId localFragmentInstanceId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      DataBlockService.Client client,
+      TsBlockSerde serde,
+      SinkHandleListener sinkHandleListener) {
+    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.remoteOperatorId = Validate.notNull(remoteOperatorId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.client = Validate.notNull(client);
+    this.serde = Validate.notNull(serde);
+    this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+  }
+
+  @Override
+  public ListenableFuture<Void> isFull() {
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    return nonCancellationPropagating(blocked);
+  }
+
+  private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
+    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+  }
+
+  @Override
+  public void send(List<TsBlock> tsBlocks) throws IOException {
+    Validate.notNull(tsBlocks, "tsBlocks is null");
+    if (throwable != null) {
+      throw new IOException(throwable);
+    }
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Sink handle is blocked.");
+    }
+    if (noMoreTsBlocks) {
+      return;
+    }
+
+    long retainedSizeInBytes = 0L;
+    for (TsBlock tsBlock : tsBlocks) {
+      retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    }
+    int currentEndSequenceId;
+    List<Long> tsBlockSizes = new ArrayList<>();
+    synchronized (this) {
+      currentEndSequenceId = bufferedTsBlocks.size();
+      blocked =
+          localMemoryManager
+              .getQueryPool()
+              .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
+      bufferRetainedSizeInBytes += retainedSizeInBytes;
+      bufferedTsBlocks.addAll(tsBlocks);
+      numOfBufferedTsBlocks += tsBlocks.size();
+      for (int i = currentEndSequenceId; i < currentEndSequenceId + tsBlocks.size(); i++) {
+        tsBlockSizes.add(bufferedTsBlocks.get(i).getRetainedSizeInBytes());
+      }
+    }
+
+    submitSendNewDataBlockEventTask(currentEndSequenceId, tsBlockSizes);
+  }
+
+  @Override
+  public void send(int partition, List<TsBlock> tsBlocks) {
+    throw new UnsupportedOperationException();
+  }
+
+  private void sendEndOfDataBlockEvent() throws TException {
+    logger.debug(
+        "Send end of data block event to operator {} of {}.",
+        remoteOperatorId,
+        remoteFragmentInstanceId);
+    int attempt = 0;
+    EndOfDataBlockEvent endOfDataBlockEvent =
+        new EndOfDataBlockEvent(
+            remoteFragmentInstanceId,
+            remoteOperatorId,
+            localFragmentInstanceId,
+            bufferedTsBlocks.size() - 1);
+    while (attempt < MAX_ATTEMPT_TIMES) {
+      attempt += 1;
+      try {
+        client.onEndOfDataBlockEvent(endOfDataBlockEvent);
+        break;
+      } catch (TException e) {
+        logger.error(
+            "Failed to send end of data block event to operator {} of {} due to {}, attempt times: {}",
+            remoteOperatorId,
+            remoteFragmentInstanceId,
+            e.getMessage(),
+            attempt);
+        if (attempt == MAX_ATTEMPT_TIMES) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    logger.info("Sink handle {} is being closed.", this);
+    if (throwable != null) {
+      throw new IOException(throwable);
+    }
+    if (closed) {
+      return;
+    }
+    synchronized (this) {
+      closed = true;
+      noMoreTsBlocks = true;
+    }
+    sinkHandleListener.onClosed(this);
+    try {
+      sendEndOfDataBlockEvent();
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    logger.info("Sink handle {} is closed.", this);
+  }
+
+  @Override
+  public void abort() {
+    logger.info("Sink handle {} is being aborted.", this);
+    synchronized (this) {
+      bufferedTsBlocks.clear();
+      numOfBufferedTsBlocks = 0;
+      closed = true;
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
+    }
+    sinkHandleListener.onAborted(this);
+    logger.info("Sink handle {} is aborted", this);
+  }
+
+  @Override
+  public synchronized void setNoMoreTsBlocks() {
+    noMoreTsBlocks = true;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return closed;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return throwable == null && noMoreTsBlocks && numOfBufferedTsBlocks == 0;
+  }
+
+  @Override
+  public long getBufferRetainedSizeInBytes() {
+    return bufferRetainedSizeInBytes;
+  }
+
+  @Override
+  public int getNumOfBufferedTsBlocks() {
+    return numOfBufferedTsBlocks;
+  }
+
+  ByteBuffer getSerializedTsBlock(int partition, int sequenceId) {
+    throw new UnsupportedOperationException();
+  }
+
+  ByteBuffer getSerializedTsBlock(int sequenceId) {
+    TsBlock tsBlock;
+    synchronized (this) {
+      tsBlock = bufferedTsBlocks.get(sequenceId);
+      if (tsBlock == null) {
+        throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + sequenceId);
+      }
+      bufferedTsBlocks.set(sequenceId, null);

Review comment:
       Setting null here is not safe, what if this response failed, so you can try the above way which can avoid this.

##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();
+
+  private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+  private NewDataBlockEvent savedNewDataBlockEvent;
+  private int numOfBufferedTsBlocks = 0;
+  private long bufferRetainedSizeInBytes;
+  private boolean closed;
+  private boolean noMoreTsBlocks;
+  private Throwable throwable;
+
+  public SinkHandle(
+      String remoteHostname,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remoteOperatorId,
+      TFragmentInstanceId localFragmentInstanceId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      DataBlockService.Client client,
+      TsBlockSerde serde,
+      SinkHandleListener sinkHandleListener) {
+    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.remoteOperatorId = Validate.notNull(remoteOperatorId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.client = Validate.notNull(client);
+    this.serde = Validate.notNull(serde);
+    this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+  }
+
+  @Override
+  public ListenableFuture<Void> isFull() {
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    return nonCancellationPropagating(blocked);
+  }
+
+  private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
+    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+  }
+
+  @Override
+  public void send(List<TsBlock> tsBlocks) throws IOException {
+    Validate.notNull(tsBlocks, "tsBlocks is null");
+    if (throwable != null) {
+      throw new IOException(throwable);
+    }
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Sink handle is blocked.");
+    }
+    if (noMoreTsBlocks) {
+      return;
+    }
+
+    long retainedSizeInBytes = 0L;
+    for (TsBlock tsBlock : tsBlocks) {
+      retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    }
+    int currentEndSequenceId;
+    List<Long> tsBlockSizes = new ArrayList<>();
+    synchronized (this) {
+      currentEndSequenceId = bufferedTsBlocks.size();
+      blocked =
+          localMemoryManager
+              .getQueryPool()
+              .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
+      bufferRetainedSizeInBytes += retainedSizeInBytes;
+      bufferedTsBlocks.addAll(tsBlocks);
+      numOfBufferedTsBlocks += tsBlocks.size();
+      for (int i = currentEndSequenceId; i < currentEndSequenceId + tsBlocks.size(); i++) {
+        tsBlockSizes.add(bufferedTsBlocks.get(i).getRetainedSizeInBytes());
+      }
+    }
+
+    submitSendNewDataBlockEventTask(currentEndSequenceId, tsBlockSizes);

Review comment:
       Currently, each time we will only send one TsBlock, if in this way, each calling `send` will submit one `SendNewDataBlockEvent` causing a network communication which is not very efficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840173865



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.EndOfDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.NewDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+
+public class SinkHandle implements ISinkHandle {
+
+  private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
+
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final String remoteOperatorId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SinkHandleListener sinkHandleListener;
+
+  // TODO: a better data structure to hold tsblocks
+  private final List<TsBlock> bufferedTsBlocks = new LinkedList<>();
+
+  private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+  private NewDataBlockEvent savedNewDataBlockEvent;
+  private int numOfBufferedTsBlocks = 0;
+  private long bufferRetainedSizeInBytes;
+  private boolean closed;
+  private boolean noMoreTsBlocks;
+  private Throwable throwable;
+
+  public SinkHandle(
+      String remoteHostname,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remoteOperatorId,
+      TFragmentInstanceId localFragmentInstanceId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      DataBlockService.Client client,
+      TsBlockSerde serde,
+      SinkHandleListener sinkHandleListener) {
+    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.remoteOperatorId = Validate.notNull(remoteOperatorId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.client = Validate.notNull(client);
+    this.serde = Validate.notNull(serde);
+    this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+  }
+
+  @Override
+  public ListenableFuture<Void> isFull() {
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    return nonCancellationPropagating(blocked);
+  }
+
+  private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
+    executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
+  }
+
+  @Override
+  public void send(List<TsBlock> tsBlocks) throws IOException {
+    Validate.notNull(tsBlocks, "tsBlocks is null");
+    if (throwable != null) {
+      throw new IOException(throwable);
+    }
+    if (closed) {
+      throw new IllegalStateException("Sink handle is closed.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Sink handle is blocked.");
+    }
+    if (noMoreTsBlocks) {
+      return;
+    }
+
+    long retainedSizeInBytes = 0L;
+    for (TsBlock tsBlock : tsBlocks) {
+      retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    }
+    int currentEndSequenceId;
+    List<Long> tsBlockSizes = new ArrayList<>();
+    synchronized (this) {
+      currentEndSequenceId = bufferedTsBlocks.size();
+      blocked =
+          localMemoryManager
+              .getQueryPool()
+              .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);

Review comment:
       Yes. Since SinkHandle provides `isFull` for callers to decide if they are allowed to call `send`, it is safe to assume that the listenable future `blocked` is completed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] cornmonster commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
cornmonster commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840212374



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
##########
@@ -38,8 +50,25 @@ public ThriftService getImplementation() {
   @Override
   public void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-    impl = new DataBlockServiceImpl();
-    processor = new Processor<>(impl);
+    executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPoolWithDaemon("data-block-manager-task-executors");

Review comment:
       Agree. But the `reasonable` value needs some further testing. What about setting the `corePoolSize` to 0 and `maximumPoolSize` to 10 for now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] JackieTien97 commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840255247



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
##########
@@ -38,8 +50,25 @@ public ThriftService getImplementation() {
   @Override
   public void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-    impl = new DataBlockServiceImpl();
-    processor = new Processor<>(impl);
+    executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPoolWithDaemon("data-block-manager-task-executors");
+    clientFactory = new DataBlockServiceClientFactory();
+    this.dataBlockManager =
+        new DataBlockManager(
+            localMemoryManager, tsBlockSerdeFactory, executorService, clientFactory);
+    processor = new Processor<>(dataBlockManager.getOrCreateDataBlockServiceImpl());

Review comment:
       Fine




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] xingtanzjr commented on a change in pull request #5367: [IOTDB-2727] data block manager impl

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on a change in pull request #5367:
URL: https://github.com/apache/iotdb/pull/5367#discussion_r840198835



##########
File path: server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
##########
@@ -19,72 +19,286 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockRequest;
+import org.apache.iotdb.mpp.rpc.thrift.GetDataBlockResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang3.Validate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 
 public class SourceHandle implements ISourceHandle {
 
-  private final long bufferCapacityInBytes;
+  private static final Logger logger = LoggerFactory.getLogger(SourceHandle.class);
+
+  private final String remoteHostname;
+  private final TFragmentInstanceId remoteFragmentInstanceId;
+  private final TFragmentInstanceId localFragmentInstanceId;
+  private final String localOperatorId;
+  private final LocalMemoryManager localMemoryManager;
+  private final ExecutorService executorService;
+  private final DataBlockService.Client client;
+  private final TsBlockSerde serde;
+  private final SourceHandleListener sourceHandleListener;
 
   private final Queue<TsBlock> bufferedTsBlocks = new ArrayDeque<>();
+  private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
+
   private volatile SettableFuture<Void> blocked = SettableFuture.create();
-  private volatile long bufferRetainedSizeInBytes;
-  private boolean finished;
+  private volatile Future<?> getDataBlocksTaskFuture = null;
+  private long bufferRetainedSizeInBytes;
+  private int nextSequenceId;
+  private int lastSequenceId = Integer.MAX_VALUE;
+  private boolean noMoreTsBlocks;
   private boolean closed;
   private Throwable throwable;
 
-  public SourceHandle(long bufferCapacityInBytes) {
-    Validate.isTrue(bufferCapacityInBytes > 0L, "capacity cannot be less or equal to zero.");
-    this.bufferCapacityInBytes = bufferCapacityInBytes;
+  public SourceHandle(
+      String remoteHostname,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      TFragmentInstanceId localFragmentInstanceId,
+      String localOperatorId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      DataBlockService.Client client,
+      TsBlockSerde serde,
+      SourceHandleListener sourceHandleListener) {
+    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.localOperatorId = Validate.notNull(localOperatorId);
+    this.localMemoryManager = Validate.notNull(localMemoryManager);
+    this.executorService = Validate.notNull(executorService);
+    this.client = Validate.notNull(client);
+    this.serde = Validate.notNull(serde);
+    this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+    bufferRetainedSizeInBytes = 0L;
   }
 
   @Override
-  public TsBlock receive() {
+  public synchronized TsBlock receive() throws IOException {
     if (throwable != null) {
-      throw new RuntimeException(throwable);
+      throw new IOException(throwable);
     }
     if (closed) {
-      throw new IllegalStateException("Source handle has been closed.");
+      throw new IllegalStateException("Source handle is closed.");
+    }
+    if (!blocked.isDone()) {
+      throw new IllegalStateException("Source handle is blocked.");
     }
     TsBlock tsBlock = bufferedTsBlocks.poll();
     if (tsBlock != null) {
-      bufferRetainedSizeInBytes -= getRetainedSizeInBytes(tsBlock);
+      bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
     }
-    if (bufferedTsBlocks.isEmpty() && !finished && blocked.isDone()) {
+    if (bufferedTsBlocks.isEmpty() && !isFinished() && blocked.isDone()) {
       blocked = SettableFuture.create();
     }
+    if (isFinished()) {
+      sourceHandleListener.onFinished(this);
+    }
+    trySubmitGetDataBlocksTask();
     return tsBlock;
   }
 
-  private long getRetainedSizeInBytes(TsBlock tsBlock) {
-    throw new UnsupportedOperationException();
+  private void trySubmitGetDataBlocksTask() {
+    if (getDataBlocksTaskFuture != null && !getDataBlocksTaskFuture.isDone()) {
+      return;
+    }
+    int startSequenceId = nextSequenceId;
+    int currSequenceId = nextSequenceId;
+    long reservedBytes = 0L;
+    while (sequenceIdToDataBlockSize.containsKey(currSequenceId)) {
+      Long bytesToReserve = sequenceIdToDataBlockSize.get(currSequenceId);
+      if (bytesToReserve == null) {
+        throw new IllegalStateException("Data block size is null.");
+      }
+      boolean reserved =
+          localMemoryManager
+              .getQueryPool()
+              .tryReserve(localFragmentInstanceId.getQueryId(), bytesToReserve);
+      if (reserved) {
+        currSequenceId += 1;
+        reservedBytes += bytesToReserve;
+        bufferRetainedSizeInBytes += bytesToReserve;
+      } else {
+        break;
+      }
+    }
+
+    if (currSequenceId > startSequenceId) {

Review comment:
       If we cannot get enough memory in one try, it seems that there is no more event to notify the SourceHandle to try again if the memory is released




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org