You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@nemo.apache.org by GitBox <gi...@apache.org> on 2020/11/08 20:00:35 UTC

[GitHub] [incubator-nemo] jaehwan0214 opened a new pull request #304: [NEMO-394] Exchange data via shared memory

jaehwan0214 opened a new pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304


   JIRA: [NEMO-394: Exchange data via shared memory](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-394)
   
   **Major changes:**
   - Implemented LocalInputContext and LocalOutputContext to enable a pipe output writer and an input reader to exchange data via shared memory when both a parent task and a child task are in the same executor. In such case, the two tasks exchange data through shared memory and therefore doesn't serialize/deserialize data.
   
   **Minor changes to note:**
   - Renamed the "bytetransfer" package to "transfer", which now also includes LocalTransfer contexts.
    
   **Tests for the changes:**
   - Added LocalTransferContext test
   - Windowed WordCount IT Case
    
   Closes #304
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723807928


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519586713



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
##########
@@ -127,14 +129,19 @@ private void fetchDataLazily() {
                   watermarkWithIndex.getIndex(), watermarkWithIndex.getWatermark());
               }
             } else {
+              if (element instanceof Finishmark) {
+                isFinishMarkProduced = true;
+              }
               // data element
               elementQueue.offer(element);
             }
           }
-
           // This iterator is finished.
           countBytesSynchronized(iterator);
-          elementQueue.offer(Finishmark.getInstance());
+          // If the current iterator hasn't produced finish mark, put it into the global queue.
+          if (!isFinishMarkProduced) {

Review comment:
       In the case of using ByteTransfer Context, its iterator doesn't produce a finish mark as an element at the end. But in the case of LocalTransfer context, its iterator produces a finish mark as an element at the end. So isFinishmarkProduced is to prevent producing two finish marks when the LocalTransfer context's iterator is finished.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519586713



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
##########
@@ -127,14 +129,19 @@ private void fetchDataLazily() {
                   watermarkWithIndex.getIndex(), watermarkWithIndex.getWatermark());
               }
             } else {
+              if (element instanceof Finishmark) {
+                isFinishMarkProduced = true;
+              }
               // data element
               elementQueue.offer(element);
             }
           }
-
           // This iterator is finished.
           countBytesSynchronized(iterator);
-          elementQueue.offer(Finishmark.getInstance());
+          // If the current iterator hasn't produced finish mark, put it into the global queue.
+          if (!isFinishMarkProduced) {

Review comment:
       We shouldn't throw an exception. In the case of using ByteTransfer Context, its iterator doesn't produce a finish mark as an element at the end. But in the case of LocalTransfer context, its iterator produces a finish mark as an element at the end. So isFinishmarkProduced is to prevent producing two finish marks when the LocalTransfer context's iterator is finished.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r526057845



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+  public final class LocalInputContext extends LocalTransferContext {
+    private final LinkedBlockingQueue queue;
+    private boolean isOutputContextClosed = false;
+
+    /**
+     * Creates a new local input context and connect it to {@param localOutputContext}.
+     * @param localOutputContext the local output context to which this local input context is connected
+     */
+    public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Checks if the connected output context has already been closed.
+   * @return true if the connected output context has already been closed.
+   */
+  public boolean isOutputContextClosed() {

Review comment:
       Yes, it is for testing purpose. I added the comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-725075599


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519581199



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
##########
@@ -127,14 +129,19 @@ private void fetchDataLazily() {
                   watermarkWithIndex.getIndex(), watermarkWithIndex.getWatermark());
               }
             } else {
+              if (element instanceof Finishmark) {
+                isFinishMarkProduced = true;
+              }
               // data element
               elementQueue.offer(element);
             }
           }
-
           // This iterator is finished.
           countBytesSynchronized(iterator);
-          elementQueue.offer(Finishmark.getInstance());
+          // If the current iterator hasn't produced finish mark, put it into the global queue.
+          if (!isFinishMarkProduced) {

Review comment:
       Then, receiving multiple Finishmark from an iterator is an exceptional case, so we should thrown an exception here? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r520977624



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+  public final class LocalInputContext extends LocalTransferContext {
+    private final LinkedBlockingQueue queue;
+    private boolean isClosed = false;
+
+    /**
+     * Creates a new local input context and connect it to {@param localOutputContext}.
+     * @param localOutputContext the local output context to which this local input context is connected
+     */
+    public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    isClosed = true;
+  }
+
+  /**
+   * Checks if this context has already been closed.
+   * @return true if this context has already been closed.
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Creates a new iterator which iterates the receive elements from the sender.
+   * @return iterator that iterates the received elements.
+   */
+  public LocalInputIterator getIterator() {
+    return new LocalInputIterator();
+  }
+
+  /**
+   * Local input iterator that iterates the received elements from the sender.
+   */
+  private class LocalInputIterator implements Iterator {
+    private Object next;
+    private boolean hasNext = false;
+
+    @Override
+    public final boolean hasNext() {
+      if (hasNext) {
+        return true;
+      }
+      if (isClosed) {

Review comment:
       Yes, I can add this additional check. But it doesn't seem necessary for me. The only possible way to make isClosed variable true is receiving a finish mark from the LocalOutputContext. The LocalOutputContext sends a finish mark only when it gets closed. After it gets closed, the LocalOutputContext doesn't let others write to its queue by returning an error. So the queue must be empty if we observes a finish mark. Let me know if there's any other cases that I missed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723831186


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723670583


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-725062632


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r520949605



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
##########
@@ -114,6 +114,7 @@ private void fetchDataLazily() {
       // A thread for each iterator
       queueInsertionThreads.submit(() -> {
         if (exception == null) {
+          // Variable to avoid producing a redundant finish mark

Review comment:
       please rm this comment

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+  public final class LocalInputContext extends LocalTransferContext {
+    private final LinkedBlockingQueue queue;
+    private boolean isClosed = false;
+
+    /**
+     * Creates a new local input context and connect it to {@param localOutputContext}.
+     * @param localOutputContext the local output context to which this local input context is connected
+     */
+    public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    isClosed = true;
+  }
+
+  /**
+   * Checks if this context has already been closed.
+   * @return true if this context has already been closed.
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Creates a new iterator which iterates the receive elements from the sender.
+   * @return iterator that iterates the received elements.
+   */
+  public LocalInputIterator getIterator() {
+    return new LocalInputIterator();
+  }
+
+  /**
+   * Local input iterator that iterates the received elements from the sender.
+   */
+  private class LocalInputIterator implements Iterator {
+    private Object next;
+    private boolean hasNext = false;
+
+    @Override
+    public final boolean hasNext() {
+      if (hasNext) {
+        return true;
+      }
+      if (isClosed) {
+        return false;
+      }
+      try {
+        // Blocking call
+        next = queue.take();
+        if (next instanceof Finishmark) {

Review comment:
       Do we need `isClosed` variable? if we check finishmark, we don't need `isClosed` variable. 

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+  public final class LocalInputContext extends LocalTransferContext {
+    private final LinkedBlockingQueue queue;
+    private boolean isClosed = false;
+
+    /**
+     * Creates a new local input context and connect it to {@param localOutputContext}.
+     * @param localOutputContext the local output context to which this local input context is connected
+     */
+    public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    isClosed = true;
+  }
+
+  /**
+   * Checks if this context has already been closed.
+   * @return true if this context has already been closed.
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Creates a new iterator which iterates the receive elements from the sender.
+   * @return iterator that iterates the received elements.
+   */
+  public LocalInputIterator getIterator() {
+    return new LocalInputIterator();
+  }
+
+  /**
+   * Local input iterator that iterates the received elements from the sender.
+   */
+  private class LocalInputIterator implements Iterator {
+    private Object next;
+    private boolean hasNext = false;
+
+    @Override
+    public final boolean hasNext() {
+      if (hasNext) {
+        return true;
+      }
+      if (isClosed) {

Review comment:
       We should check whether the queue is empty or not, because there could be remaining events in the queue. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723823008


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723658142


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-729654322


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723749837


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723831186


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-725069345


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-725075599


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r520965567



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+  public final class LocalInputContext extends LocalTransferContext {
+    private final LinkedBlockingQueue queue;
+    private boolean isClosed = false;
+
+    /**
+     * Creates a new local input context and connect it to {@param localOutputContext}.
+     * @param localOutputContext the local output context to which this local input context is connected
+     */
+    public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    isClosed = true;
+  }
+
+  /**
+   * Checks if this context has already been closed.
+   * @return true if this context has already been closed.
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Creates a new iterator which iterates the receive elements from the sender.
+   * @return iterator that iterates the received elements.
+   */
+  public LocalInputIterator getIterator() {
+    return new LocalInputIterator();
+  }
+
+  /**
+   * Local input iterator that iterates the received elements from the sender.
+   */
+  private class LocalInputIterator implements Iterator {
+    private Object next;
+    private boolean hasNext = false;
+
+    @Override
+    public final boolean hasNext() {
+      if (hasNext) {
+        return true;
+      }
+      if (isClosed) {
+        return false;
+      }
+      try {
+        // Blocking call
+        next = queue.take();
+        if (next instanceof Finishmark) {

Review comment:
       isClosed variable handles the case when the "hasNext" method gets called multiple times while the "next" method doesn't get invoked. I'm aware that this case doesn't occur since the data fetcher calls the "next" method every time the "hasNext" method returns true. But to make it more general, we should handle this common case also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-725747251


   I'm going to make more rigorous tests for this PR. When I'm done, I'll update the description above. Until then, please do not merge. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519581355



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;

Review comment:
       I used to nullify the reference. Since this is not the case, I fixed it.

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;
+  private LocalOutputContext localOutputContext;

Review comment:
       Fixed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] wonook merged pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
wonook merged pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723823008


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519601697



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
##########
@@ -127,14 +129,19 @@ private void fetchDataLazily() {
                   watermarkWithIndex.getIndex(), watermarkWithIndex.getWatermark());
               }
             } else {
+              if (element instanceof Finishmark) {
+                isFinishMarkProduced = true;
+              }
               // data element
               elementQueue.offer(element);
             }
           }
-
           // This iterator is finished.
           countBytesSynchronized(iterator);
-          elementQueue.offer(Finishmark.getInstance());
+          // If the current iterator hasn't produced finish mark, put it into the global queue.
+          if (!isFinishMarkProduced) {

Review comment:
       I fixed the LocalInputContext iterator so that it doesn't produce a finish mark at the end. We no longer need to handle the case in which multiple finish marks are produced.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-729654322


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723807928


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r520954667



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
##########
@@ -114,6 +114,7 @@ private void fetchDataLazily() {
       // A thread for each iterator
       queueInsertionThreads.submit(() -> {
         if (exception == null) {
+          // Variable to avoid producing a redundant finish mark

Review comment:
       Thank you for catching this. I removed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r526048903



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+  public final class LocalInputContext extends LocalTransferContext {
+    private final LinkedBlockingQueue queue;
+    private boolean isOutputContextClosed = false;

Review comment:
       Fixed it. Thank you




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723670583


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723749837


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-729504671


   I added the test results on the description above.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-730146919


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-730028460


   LGTM. I will merge it. Thanks @jaehwan0214 ! 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
taegeonum commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723751599


   Thanks for the PR. I will take a look


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-725062632


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519583605



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalOutputContext.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the sender side when both the sender and the receiver are
+ * in the same executor. Since data serialization is unnecessary, the sender sends data without serializing
+ * them. A single local output context represents a data transfer between two tasks.
+ */
+public final class LocalOutputContext extends LocalTransferContext implements OutputContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalOutputContext.class.getName());
+  private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
+  private boolean isClosed = false;
+
+  /**
+   * Creates a new local output context.
+   * @param executorId id of the executor to which this context belong
+   * @param edgeId id of the DAG edge
+   * @param srcTaskIndex source task index
+   * @param dstTaskIndex destination task index
+   */
+  public LocalOutputContext(final String executorId,
+                            final String edgeId,
+                            final int srcTaskIndex,
+                            final int dstTaskIndex) {
+    super(executorId, edgeId, srcTaskIndex, dstTaskIndex);
+  }
+
+  /**
+   * Closes this local output context.
+   */
+  @Override
+  public void close() {
+    if (isClosed) {
+      throw new RuntimeException("This context has already been closed");
+    }
+    queue.offer(Finishmark.getInstance());
+    isClosed = true;
+    // Nullify the reference to the queue for potential garbage collection
+    queue = null;

Review comment:
       This is unnecessary. I removed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519583194



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;
+  private LocalOutputContext localOutputContext;
+  private boolean isClosed = false;
+
+  /**
+   * Creates a new local input context and connect it to {@param localOutputContext}.
+   * @param localOutputContext the local output context to which this local input context is connected
+   */
+  public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.localOutputContext = localOutputContext;
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    if (!localOutputContext.isClosed()) {
+      throw new RuntimeException("The parent task writer is still sending data");
+    }
+    if (!queue.isEmpty()) {
+      throw new RuntimeException("There are data left in this context to be processed");
+    }
+    // Nullify references for potential garbage collection

Review comment:
       Thank you for pointing it out. This check is actually unnecessary because the processing of all of the events in the output context queue is guaranteed. Therefore, I removed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519583398



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;
+  private LocalOutputContext localOutputContext;
+  private boolean isClosed = false;
+
+  /**
+   * Creates a new local input context and connect it to {@param localOutputContext}.
+   * @param localOutputContext the local output context to which this local input context is connected
+   */
+  public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.localOutputContext = localOutputContext;
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    if (!localOutputContext.isClosed()) {
+      throw new RuntimeException("The parent task writer is still sending data");
+    }
+    if (!queue.isEmpty()) {
+      throw new RuntimeException("There are data left in this context to be processed");
+    }
+    // Nullify references for potential garbage collection
+    queue = null;
+    localOutputContext = null;
+    isClosed = true;
+  }
+
+  /**
+   * Checks if this context has already been closed.
+   * @return true if this context has already been closed.
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Creates a new iterator which iterates the receive elements from the sender.
+   * @return iterator that iterates the received elements.
+   */
+  public LocalInputIterator getIterator() {
+    return new LocalInputIterator();
+  }
+
+  /**
+   * Local input iterator that iterates the received elements from the sender.
+   */
+  private class LocalInputIterator implements Iterator {
+    @Override
+    public final boolean hasNext() {
+      if (isClosed) {
+        return false;
+      }
+      while (queue.peek() == null) {
+        continue;

Review comment:
       I fixed it by using LinkedBlockingQueue instead of ConcurrentLInkedQueue

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;
+  private LocalOutputContext localOutputContext;
+  private boolean isClosed = false;
+
+  /**
+   * Creates a new local input context and connect it to {@param localOutputContext}.
+   * @param localOutputContext the local output context to which this local input context is connected
+   */
+  public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.localOutputContext = localOutputContext;
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    if (!localOutputContext.isClosed()) {
+      throw new RuntimeException("The parent task writer is still sending data");
+    }
+    if (!queue.isEmpty()) {
+      throw new RuntimeException("There are data left in this context to be processed");
+    }
+    // Nullify references for potential garbage collection
+    queue = null;
+    localOutputContext = null;
+    isClosed = true;
+  }
+
+  /**
+   * Checks if this context has already been closed.
+   * @return true if this context has already been closed.
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Creates a new iterator which iterates the receive elements from the sender.
+   * @return iterator that iterates the received elements.
+   */
+  public LocalInputIterator getIterator() {
+    return new LocalInputIterator();
+  }
+
+  /**
+   * Local input iterator that iterates the received elements from the sender.
+   */
+  private class LocalInputIterator implements Iterator {
+    @Override
+    public final boolean hasNext() {
+      if (isClosed) {
+        return false;
+      }
+      while (queue.peek() == null) {
+        continue;
+      }
+      return true;
+    }
+
+    @Override
+    public final Object next() throws RuntimeException {
+      if (isClosed) {
+        throw new RuntimeException("This context has already been closed");
+      } else {
+        Object element;
+        while ((element = queue.poll()) == null) {
+          continue;

Review comment:
       Fixed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519580659



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
##########
@@ -127,14 +129,19 @@ private void fetchDataLazily() {
                   watermarkWithIndex.getIndex(), watermarkWithIndex.getWatermark());
               }
             } else {
+              if (element instanceof Finishmark) {
+                isFinishMarkProduced = true;
+              }
               // data element
               elementQueue.offer(element);
             }
           }
-
           // This iterator is finished.
           countBytesSynchronized(iterator);
-          elementQueue.offer(Finishmark.getInstance());
+          // If the current iterator hasn't produced finish mark, put it into the global queue.
+          if (!isFinishMarkProduced) {

Review comment:
       When the local output context closes, it produces a finish mark. In this case, the data fetcher receives a finish mark from the iterator. Then, the data fetcher shouldn't produce another finish mark when the iterator is finished. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r525991584



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+  public final class LocalInputContext extends LocalTransferContext {
+    private final LinkedBlockingQueue queue;
+    private boolean isOutputContextClosed = false;

Review comment:
       `isOutputContextClosed` -> `outputContextClosed`

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+  public final class LocalInputContext extends LocalTransferContext {
+    private final LinkedBlockingQueue queue;
+    private boolean isOutputContextClosed = false;
+
+    /**
+     * Creates a new local input context and connect it to {@param localOutputContext}.
+     * @param localOutputContext the local output context to which this local input context is connected
+     */
+    public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Checks if the connected output context has already been closed.
+   * @return true if the connected output context has already been closed.
+   */
+  public boolean isOutputContextClosed() {

Review comment:
       Is this function for test? then could you please add a comment? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] removed a comment on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-725069345


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] sonarcloud[bot] commented on pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#issuecomment-723658142


   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](htt
 ps://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=org.apache.nemo%3Anemo-project&pullRequest=304&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/NoDuplicationInfo.png' alt='No Duplication information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=org.apache.nemo%3Anemo-project&pullRequest=304&metric=duplicated_lines_density&view=list) No Duplication information
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] jaehwan0214 commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
jaehwan0214 commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r520965567



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+  public final class LocalInputContext extends LocalTransferContext {
+    private final LinkedBlockingQueue queue;
+    private boolean isClosed = false;
+
+    /**
+     * Creates a new local input context and connect it to {@param localOutputContext}.
+     * @param localOutputContext the local output context to which this local input context is connected
+     */
+    public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    isClosed = true;
+  }
+
+  /**
+   * Checks if this context has already been closed.
+   * @return true if this context has already been closed.
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Creates a new iterator which iterates the receive elements from the sender.
+   * @return iterator that iterates the received elements.
+   */
+  public LocalInputIterator getIterator() {
+    return new LocalInputIterator();
+  }
+
+  /**
+   * Local input iterator that iterates the received elements from the sender.
+   */
+  private class LocalInputIterator implements Iterator {
+    private Object next;
+    private boolean hasNext = false;
+
+    @Override
+    public final boolean hasNext() {
+      if (hasNext) {
+        return true;
+      }
+      if (isClosed) {
+        return false;
+      }
+      try {
+        // Blocking call
+        next = queue.take();
+        if (next instanceof Finishmark) {

Review comment:
       isClosed variable handles the case when the "hasNext" method gets called multiple times while the "next" method doesn't get invoked. I'm aware that this case doesn't happen since the data fetcher calls the "next" method every time the "hasNext" method returns true. But to make it more general, we should handle this common case also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-nemo] taegeonum commented on a change in pull request #304: [NEMO-394] Exchange data via shared memory

Posted by GitBox <gi...@apache.org>.
taegeonum commented on a change in pull request #304:
URL: https://github.com/apache/incubator-nemo/pull/304#discussion_r519560546



##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
##########
@@ -127,14 +129,19 @@ private void fetchDataLazily() {
                   watermarkWithIndex.getIndex(), watermarkWithIndex.getWatermark());
               }
             } else {
+              if (element instanceof Finishmark) {
+                isFinishMarkProduced = true;
+              }
               // data element
               elementQueue.offer(element);
             }
           }
-
           // This iterator is finished.
           countBytesSynchronized(iterator);
-          elementQueue.offer(Finishmark.getInstance());
+          // If the current iterator hasn't produced finish mark, put it into the global queue.
+          if (!isFinishMarkProduced) {

Review comment:
       When are multiple finshmarks received? 

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;

Review comment:
       Why not final?

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalOutputContext.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the sender side when both the sender and the receiver are
+ * in the same executor. Since data serialization is unnecessary, the sender sends data without serializing
+ * them. A single local output context represents a data transfer between two tasks.
+ */
+public final class LocalOutputContext extends LocalTransferContext implements OutputContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalOutputContext.class.getName());
+  private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
+  private boolean isClosed = false;
+
+  /**
+   * Creates a new local output context.
+   * @param executorId id of the executor to which this context belong
+   * @param edgeId id of the DAG edge
+   * @param srcTaskIndex source task index
+   * @param dstTaskIndex destination task index
+   */
+  public LocalOutputContext(final String executorId,
+                            final String edgeId,
+                            final int srcTaskIndex,
+                            final int dstTaskIndex) {
+    super(executorId, edgeId, srcTaskIndex, dstTaskIndex);
+  }
+
+  /**
+   * Closes this local output context.
+   */
+  @Override
+  public void close() {
+    if (isClosed) {
+      throw new RuntimeException("This context has already been closed");
+    }
+    queue.offer(Finishmark.getInstance());
+    isClosed = true;
+    // Nullify the reference to the queue for potential garbage collection
+    queue = null;

Review comment:
       why do we need this? 

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;
+  private LocalOutputContext localOutputContext;

Review comment:
       Why not final?

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;
+  private LocalOutputContext localOutputContext;
+  private boolean isClosed = false;
+
+  /**
+   * Creates a new local input context and connect it to {@param localOutputContext}.
+   * @param localOutputContext the local output context to which this local input context is connected
+   */
+  public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.localOutputContext = localOutputContext;
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    if (!localOutputContext.isClosed()) {
+      throw new RuntimeException("The parent task writer is still sending data");
+    }
+    if (!queue.isEmpty()) {
+      throw new RuntimeException("There are data left in this context to be processed");
+    }
+    // Nullify references for potential garbage collection

Review comment:
       Why  do you nullify? If the context is closed, should we check the output context queue? whether or not there are remaining events? Maybe we should guarantee processing all of the events in the output context queue. 

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;
+  private LocalOutputContext localOutputContext;
+  private boolean isClosed = false;
+
+  /**
+   * Creates a new local input context and connect it to {@param localOutputContext}.
+   * @param localOutputContext the local output context to which this local input context is connected
+   */
+  public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.localOutputContext = localOutputContext;
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    if (!localOutputContext.isClosed()) {
+      throw new RuntimeException("The parent task writer is still sending data");
+    }
+    if (!queue.isEmpty()) {
+      throw new RuntimeException("There are data left in this context to be processed");
+    }
+    // Nullify references for potential garbage collection
+    queue = null;
+    localOutputContext = null;
+    isClosed = true;
+  }
+
+  /**
+   * Checks if this context has already been closed.
+   * @return true if this context has already been closed.
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Creates a new iterator which iterates the receive elements from the sender.
+   * @return iterator that iterates the received elements.
+   */
+  public LocalInputIterator getIterator() {
+    return new LocalInputIterator();
+  }
+
+  /**
+   * Local input iterator that iterates the received elements from the sender.
+   */
+  private class LocalInputIterator implements Iterator {
+    @Override
+    public final boolean hasNext() {
+      if (isClosed) {
+        return false;
+      }
+      while (queue.peek() == null) {
+        continue;

Review comment:
       This will lead to spin-loop, which excessively uses CPU cycles. We should avoid it. 

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/LocalInputContext.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.transfer;
+
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class provides a data transfer interface to the receiver side when both the sender and the receiver are in the
+ * same executor. Since the sender doesn't serialize data, the receiver doesn't need to deserialize data when retrieving
+ * them.
+ */
+public final class LocalInputContext extends LocalTransferContext {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalInputContext.class.getName());
+  private ConcurrentLinkedQueue queue;
+  private LocalOutputContext localOutputContext;
+  private boolean isClosed = false;
+
+  /**
+   * Creates a new local input context and connect it to {@param localOutputContext}.
+   * @param localOutputContext the local output context to which this local input context is connected
+   */
+  public LocalInputContext(final LocalOutputContext localOutputContext) {
+    super(localOutputContext.getExecutorId(),
+          localOutputContext.getEdgeId(),
+          localOutputContext.getSrcTaskIndex(),
+          localOutputContext.getDstTaskIndex());
+    this.localOutputContext = localOutputContext;
+    this.queue = localOutputContext.getQueue();
+  }
+
+  /**
+   * Closes this local input context.
+   * @throws RuntimeException if the connected output context hasn't been closed yet, or if there are still data
+   * left to be processed.
+   */
+  @Override
+  public void close() throws RuntimeException {
+    if (!localOutputContext.isClosed()) {
+      throw new RuntimeException("The parent task writer is still sending data");
+    }
+    if (!queue.isEmpty()) {
+      throw new RuntimeException("There are data left in this context to be processed");
+    }
+    // Nullify references for potential garbage collection
+    queue = null;
+    localOutputContext = null;
+    isClosed = true;
+  }
+
+  /**
+   * Checks if this context has already been closed.
+   * @return true if this context has already been closed.
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Creates a new iterator which iterates the receive elements from the sender.
+   * @return iterator that iterates the received elements.
+   */
+  public LocalInputIterator getIterator() {
+    return new LocalInputIterator();
+  }
+
+  /**
+   * Local input iterator that iterates the received elements from the sender.
+   */
+  private class LocalInputIterator implements Iterator {
+    @Override
+    public final boolean hasNext() {
+      if (isClosed) {
+        return false;
+      }
+      while (queue.peek() == null) {
+        continue;
+      }
+      return true;
+    }
+
+    @Override
+    public final Object next() throws RuntimeException {
+      if (isClosed) {
+        throw new RuntimeException("This context has already been closed");
+      } else {
+        Object element;
+        while ((element = queue.poll()) == null) {
+          continue;

Review comment:
       This will lead to spin-loop, which excessively uses CPU cycles. We should avoid it.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org