You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/11/10 03:08:11 UTC

[GitHub] [iotdb] MarcosZyk opened a new pull request, #7957: Mpp object

MarcosZyk opened a new pull request, #7957:
URL: https://github.com/apache/iotdb/pull/7957

   ## Description
   
   
   ### Content1 ...
   
   ### Content2 ...
   
   ### Content3 ...
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error 
       conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, 
       design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design 
   (or naming) decision point and compare the alternatives with the designs that you've implemented 
   (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere 
   (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), 
   link to that discussion from this PR description and explain what have changed in your final design 
   compared to your original proposal or the consensus version in the end of the discussion. 
   If something hasn't changed since the original discussion, you can omit a detailed discussion of 
   those aspects of the design here, perhaps apart from brief mentioning for the sake of readability 
   of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
       - [ ] concurrent read
       - [ ] concurrent write
       - [ ] concurrent read and write 
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. 
   - [ ] added or updated version, __license__, or notice information
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious 
     for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold 
     for code coverage.
   - [ ] added integration tests.
   - [ ] been tested in a test IoTDB cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items 
   apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items 
   from the checklist above are strictly necessary, but it would be very helpful if you at least 
   self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes (or packages if there are too many classes) in this PR
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] xingtanzjr commented on a diff in pull request #7957: Mpp object

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on code in PR #7957:
URL: https://github.com/apache/iotdb/pull/7957#discussion_r1019053013


##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/object/ObjectDeserializeOperator.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.object;
+
+import org.apache.iotdb.db.mpp.execution.object.MPPObjectPool;
+import org.apache.iotdb.db.mpp.execution.object.ObjectEntry;
+import org.apache.iotdb.db.mpp.execution.object.ObjectEntryFactory;
+import org.apache.iotdb.db.mpp.execution.object.ObjectType;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.apache.iotdb.db.mpp.execution.operator.object.ObjectQueryConstant.BATCH_END_SYMBOL;
+import static org.apache.iotdb.db.mpp.execution.operator.object.ObjectQueryConstant.OBJECT_START_SYMBOL;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class ObjectDeserializeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final String queryId;
+  private final MPPObjectPool objectPool = MPPObjectPool.getInstance();
+
+  private final List<TSDataType> outputDataTypes = Collections.singletonList(TSDataType.INT32);
+
+  private final Operator child;
+
+  private final List<ByteBuffer> bufferList = new ArrayList<>();
+
+  public ObjectDeserializeOperator(
+      OperatorContext operatorContext, String queryId, Operator child) {
+    this.operatorContext = operatorContext;
+    this.queryId = queryId;
+    this.child = child;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    TsBlock tsBlock = child.next();
+    if (tsBlock == null || tsBlock.isEmpty()) {
+      return null;
+    }
+    ByteBuffer buffer;
+    for (int i = 0; i < tsBlock.getPositionCount() - 1; i++) {
+      buffer = ByteBuffer.wrap(tsBlock.getColumn(0).getBinary(i).getValues());
+      bufferList.add(buffer);

Review Comment:
   Maybe it is not a safe way that sharing the Intermediate result by class level variable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] xingtanzjr commented on a diff in pull request #7957: Mpp object

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on code in PR #7957:
URL: https://github.com/apache/iotdb/pull/7957#discussion_r1019058017


##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/object/MPPObjectPool.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.object;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MPPObjectPool {
+
+  private final Map<String, List<ObjectEntry>> objectPool = new ConcurrentHashMap<>();
+
+  private MPPObjectPool() {}
+
+  private static class MPPObjectPoolHolder {
+
+    private static final MPPObjectPool INSTANCE = new MPPObjectPool();
+
+    private MPPObjectPoolHolder() {}
+  }
+
+  public static MPPObjectPool getInstance() {
+    return MPPObjectPoolHolder.INSTANCE;
+  }
+
+  public synchronized <T extends ObjectEntry> T put(String queryId, T objectEntry) {
+    List<ObjectEntry> queryObjectList =
+        objectPool.computeIfAbsent(queryId, k -> Collections.synchronizedList(new ArrayList<>()));
+    queryObjectList.add(objectEntry);
+    objectEntry.setId(queryObjectList.size() - 1);
+    return objectEntry;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends ObjectEntry> T get(String queryId, int objectId) {
+    List<ObjectEntry> queryObjectList = objectPool.get(queryId);
+    if (queryObjectList == null) {
+      return null;
+    }
+    return (T) queryObjectList.get(objectId);
+  }
+
+  public void clear(String queryId) {

Review Comment:
   If the object is cleared only when the query ends, it will use lots of memory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] xingtanzjr commented on a diff in pull request #7957: Mpp object

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on code in PR #7957:
URL: https://github.com/apache/iotdb/pull/7957#discussion_r1019053013


##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/object/ObjectDeserializeOperator.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.object;
+
+import org.apache.iotdb.db.mpp.execution.object.MPPObjectPool;
+import org.apache.iotdb.db.mpp.execution.object.ObjectEntry;
+import org.apache.iotdb.db.mpp.execution.object.ObjectEntryFactory;
+import org.apache.iotdb.db.mpp.execution.object.ObjectType;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.apache.iotdb.db.mpp.execution.operator.object.ObjectQueryConstant.BATCH_END_SYMBOL;
+import static org.apache.iotdb.db.mpp.execution.operator.object.ObjectQueryConstant.OBJECT_START_SYMBOL;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class ObjectDeserializeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final String queryId;
+  private final MPPObjectPool objectPool = MPPObjectPool.getInstance();
+
+  private final List<TSDataType> outputDataTypes = Collections.singletonList(TSDataType.INT32);
+
+  private final Operator child;
+
+  private final List<ByteBuffer> bufferList = new ArrayList<>();
+
+  public ObjectDeserializeOperator(
+      OperatorContext operatorContext, String queryId, Operator child) {
+    this.operatorContext = operatorContext;
+    this.queryId = queryId;
+    this.child = child;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    TsBlock tsBlock = child.next();
+    if (tsBlock == null || tsBlock.isEmpty()) {
+      return null;
+    }
+    ByteBuffer buffer;
+    for (int i = 0; i < tsBlock.getPositionCount() - 1; i++) {
+      buffer = ByteBuffer.wrap(tsBlock.getColumn(0).getBinary(i).getValues());
+      bufferList.add(buffer);

Review Comment:
   Maybe it is not a safe way that sharing the intevmidie value by class level variable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] xingtanzjr commented on a diff in pull request #7957: Mpp object

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on code in PR #7957:
URL: https://github.com/apache/iotdb/pull/7957#discussion_r1019046019


##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/object/MPPObjectPool.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.object;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MPPObjectPool {
+
+  private final Map<String, List<ObjectEntry>> objectPool = new ConcurrentHashMap<>();
+
+  private MPPObjectPool() {}
+
+  private static class MPPObjectPoolHolder {
+
+    private static final MPPObjectPool INSTANCE = new MPPObjectPool();
+
+    private MPPObjectPoolHolder() {}
+  }
+
+  public static MPPObjectPool getInstance() {
+    return MPPObjectPoolHolder.INSTANCE;
+  }
+
+  public synchronized <T extends ObjectEntry> T put(String queryId, T objectEntry) {
+    List<ObjectEntry> queryObjectList =
+        objectPool.computeIfAbsent(queryId, k -> Collections.synchronizedList(new ArrayList<>()));

Review Comment:
   Some behaviors of `synchronizedList` may be different from our expectation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] xingtanzjr commented on a diff in pull request #7957: Mpp object

Posted by GitBox <gi...@apache.org>.
xingtanzjr commented on code in PR #7957:
URL: https://github.com/apache/iotdb/pull/7957#discussion_r1019055989


##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/object/ObjectDeserializeOperator.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.object;
+
+import org.apache.iotdb.db.mpp.execution.object.MPPObjectPool;
+import org.apache.iotdb.db.mpp.execution.object.ObjectEntry;
+import org.apache.iotdb.db.mpp.execution.object.ObjectEntryFactory;
+import org.apache.iotdb.db.mpp.execution.object.ObjectType;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.apache.iotdb.db.mpp.execution.operator.object.ObjectQueryConstant.BATCH_END_SYMBOL;
+import static org.apache.iotdb.db.mpp.execution.operator.object.ObjectQueryConstant.OBJECT_START_SYMBOL;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class ObjectDeserializeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final String queryId;
+  private final MPPObjectPool objectPool = MPPObjectPool.getInstance();
+
+  private final List<TSDataType> outputDataTypes = Collections.singletonList(TSDataType.INT32);
+
+  private final Operator child;
+
+  private final List<ByteBuffer> bufferList = new ArrayList<>();
+
+  public ObjectDeserializeOperator(
+      OperatorContext operatorContext, String queryId, Operator child) {
+    this.operatorContext = operatorContext;
+    this.queryId = queryId;
+    this.child = child;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    TsBlock tsBlock = child.next();
+    if (tsBlock == null || tsBlock.isEmpty()) {
+      return null;
+    }
+    ByteBuffer buffer;
+    for (int i = 0; i < tsBlock.getPositionCount() - 1; i++) {
+      buffer = ByteBuffer.wrap(tsBlock.getColumn(0).getBinary(i).getValues());
+      bufferList.add(buffer);
+    }
+    if (Arrays.equals(
+        tsBlock.getColumn(0).getBinary(tsBlock.getPositionCount() - 1).getValues(),
+        BATCH_END_SYMBOL)) {
+      return generateObject();
+    } else {
+      return null;
+    }
+  }
+
+  private TsBlock generateObject() {
+    SegmentedByteInputStream segmentedByteInputStream = new SegmentedByteInputStream(bufferList);
+    DataInputStream dataInputStream = new DataInputStream(segmentedByteInputStream);
+    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+    try {
+      byte objectRecordSymbol = dataInputStream.readByte();
+      while (objectRecordSymbol == OBJECT_START_SYMBOL) {
+        ObjectEntry objectEntry =
+            ObjectEntryFactory.getObjectEntry(ObjectType.deserialize(segmentedByteInputStream));
+        objectEntry.deserializeObject(dataInputStream);
+        builder.getTimeColumnBuilder().writeLong(0L);
+        builder.getColumnBuilder(0).writeInt(objectPool.put(queryId, objectEntry).getId());

Review Comment:
   When do we remove the object from the pool to release the memory ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org