You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/21 02:33:30 UTC
[iotdb] 04/07: [To_new_mpp] add IDataBlockManager (#5240)
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 93722e87856c9972ccdf7eacc47166ebbc9f8654
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Mar 15 16:21:37 2022 +0800
[To_new_mpp] add IDataBlockManager (#5240)
---
.../mpp/execution/IFragmentInstanceManager.java | 6 +-
.../iotdb/mpp/shuffle/IDataBlockManager.java | 80 ++++++++++++++++++++++
2 files changed, 84 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
index e0eecfa..3bda549 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -26,7 +26,8 @@ public interface IFragmentInstanceManager {
void submitFragmentInstance();
/**
- * the notifying interface for {@link DataBlockManager} when upstream data comes.
+ * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
+ * upstream data comes.
*
* @param instanceID the fragment instance to be notified.
* @param upstreamInstanceId the upstream instance id.
@@ -34,7 +35,8 @@ public interface IFragmentInstanceManager {
void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
/**
- * the notifying interface for {@link DataBlockManager} when downstream data has been consumed.
+ * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
+ * downstream data has been consumed.
*
* @param instanceID the fragment instance to be notified.
* @param downstreamInstanceId the downstream instance id.
diff --git a/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
new file mode 100644
index 0000000..a05d04a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.shuffle;
+
+import org.apache.iotdb.mpp.common.ITSBlock;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+public interface IDataBlockManager {
+
+ /**
+ * Register a new fragment instance. The block manager will start looking for upstream data blocks
+ * and flushing data blocks generated to downstream fragment instances.
+ */
+ void registerFragmentInstance(FragmentInstanceTask task);
+
+ /**
+ * Deregister a fragment instance. The block manager will stop looking for upstream data blocks
+ * and release the input data blocks, but will keep flushing data blocks to downstream fragment
+ * instances until all the data blocks are sent. Once all the data blocks are sent, the output
+ * data blocks will be release.
+ *
+ * <p>This method should be called when a fragment instance finished in a normal state.
+ */
+ void deregisterFragmentInstance(FragmentInstanceTask task);
+
+ /**
+ * Deregister a fragment instance. The block manager will release all the related resources.
+ * Including data blocks that are not yet sent to downstream fragment instances.
+ *
+ * <p>This method should be called when a fragment instance finished in an abnormal state.
+ */
+ void forceDeregisterFragmentInstance(FragmentInstanceTask task);
+
+ /**
+ * Put a data block to the output buffer for downstream fragment instances. Will throw an {@link
+ * IllegalStateException} if the output buffer is full.
+ *
+ * <p>Once the block be put into the output buffer, the data block manager will notify downstream
+ * fragment instances that a new data block is available.
+ *
+ * @param instanceID ID of fragment instance that generates the block.
+ * @return If there are enough memory for the next block.
+ */
+ boolean putDataBlock(FragmentInstanceID instanceID, ITSBlock block);
+
+ /**
+ * Check if there are data blocks from the specified upstream fragment instance.
+ *
+ * @param instanceID ID of the upstream fragment instance.
+ * @return If there are available data blocks.
+ */
+ boolean hasDataBlock(FragmentInstanceID instanceID);
+
+ /**
+ * Get a data block from the input buffer of specified upstream fragment instance. Will throw an
+ * {@link IllegalStateException} if the input buffer is empty.
+ *
+ * @param instanceID ID of the upstream fragment instance.
+ * @return A data block.
+ */
+ ITSBlock getDataBlock(FragmentInstanceID instanceID);
+}