You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/01/21 02:32:48 UTC

apex-core git commit: APEXCORE-596 Setting the thread for all oio nodes in the oio group, refactoring tests

Repository: apex-core
Updated Branches:
  refs/heads/master 7ea7f6073 -> a469dfb22


APEXCORE-596 Setting the thread for all oio nodes in the oio group, refactoring tests


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a469dfb2
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a469dfb2
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a469dfb2

Branch: refs/heads/master
Commit: a469dfb229c0d42c07048d95d070703331e2c429
Parents: 7ea7f60
Author: francisf <fr...@gmail.com>
Authored: Thu Jan 5 13:02:05 2017 +0530
Committer: francisf <fr...@gmail.com>
Committed: Fri Jan 20 22:51:21 2017 +0530

----------------------------------------------------------------------
 .../stram/engine/StreamingContainer.java        |  7 +++
 .../stram/engine/StreamingContainerTest.java    | 66 +++++++++++++++-----
 2 files changed, 58 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/a469dfb2/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 78f3421..86c0402 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -1460,6 +1460,13 @@ public class StreamingContainer extends YarnContainerMain
         }
       };
       node.context.setThread(thread);
+      List<Integer> oioNodeIdList = oioGroups.get(ndi.id);
+      if (oioNodeIdList != null) {
+        for (Integer oioNodeId : oioNodeIdList) {
+          Node<?> oioNode = nodes.get(oioNodeId);
+          oioNode.context.setThread(thread);
+        }
+      }
       thread.start();
     }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/a469dfb2/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
index 18aee4c..451972e 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
@@ -20,8 +20,9 @@ package com.datatorrent.stram.engine;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -30,8 +31,13 @@ import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.stram.StramLocalCluster;
@@ -42,6 +48,9 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
  */
 public class StreamingContainerTest
 {
+  private static final Logger logger =  LoggerFactory.getLogger(StreamingContainerTest.class);
+  private static Set<String> committedWindowIds = Collections.synchronizedSet(new HashSet<String>());
+  private static Set<String> checkpointedWindowIds = Collections.synchronizedSet(new HashSet<String>());
 
   @Test
   public void testCommitted() throws IOException, ClassNotFoundException
@@ -50,40 +59,68 @@ public class StreamingContainerTest
     String workingDir = new File("target/testCommitted").getAbsolutePath();
     lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
     lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
-    CommitAwareOperator operator = lp.addOperator("CommitAwareOperator", new CommitAwareOperator());
+    String opName = "CommitAwareOperatorTestCommit";
+    lp.addOperator(opName, new CommitAwareOperator());
 
-    List<Long> myCommittedWindowIds = CommitAwareOperator.getCommittedWindowIdsContainer();
+    StramLocalCluster lc = new StramLocalCluster(lp);
+    lc.run(5000);
+
+    /* this is not foolproof but some insurance is better than nothing */
+    Assert.assertTrue("No Committed Windows", committedWindowIds.contains(opName));
+  }
+
+  @Test
+  public void testOiOCommitted() throws IOException, ClassNotFoundException
+  {
+    LogicalPlan lp = new LogicalPlan();
+    String workingDir = new File("target/testCommitted").getAbsolutePath();
+    lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
+    lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
+    String op1Name = "CommitAwareOperatorTestOioCommit1";
+    String op2Name = "CommitAwareOperatorTestOioCommit2";
+    CommitAwareOperator operator1 = lp.addOperator(op1Name, new CommitAwareOperator());
+    CommitAwareOperator operator2 = lp.addOperator(op2Name, new CommitAwareOperator());
+    lp.addStream("local", operator1.output, operator2.input).setLocality(Locality.THREAD_LOCAL);
 
     StramLocalCluster lc = new StramLocalCluster(lp);
     lc.run(5000);
 
     /* this is not foolproof but some insurance is better than nothing */
-    Assert.assertSame("Concurrent Use detected", myCommittedWindowIds, CommitAwareOperator.committedWindowIds);
-    Assert.assertFalse("No Committed Windows", myCommittedWindowIds.isEmpty());
+    Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op1Name));
+    Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op2Name));
   }
 
   private static class CommitAwareOperator extends BaseOperator implements CheckpointListener, InputOperator
   {
-    public static ArrayList<Long> committedWindowIds;
-    public static ArrayList<Long> checkpointedWindowIds = new ArrayList<Long>();
+    private transient String name;
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
 
-    public static final synchronized List<Long> getCommittedWindowIdsContainer()
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+    {
+      @Override
+      public void process(String tuple)
+      {
+      }
+    };
+    @Override
+    public void setup(OperatorContext context)
     {
-      return committedWindowIds = new ArrayList<Long>();
+      this.name = context.getName();
     }
 
     @Override
     public void checkpointed(long windowId)
     {
-      checkpointedWindowIds.add(windowId);
-      logger.debug("checkpointed {}", windowId);
+      checkpointedWindowIds.add(name);
+      logger.debug("checkpointed {} {}", name, windowId);
     }
 
     @Override
     public void committed(long windowId)
     {
-      committedWindowIds.add(windowId);
-      logger.debug("committed {}", windowId);
+      committedWindowIds.add(name);
+      logger.debug("committed {} {}", name, windowId);
     }
 
     @Override
@@ -91,7 +128,6 @@ public class StreamingContainerTest
     {
     }
 
-    private static final Logger logger = LoggerFactory.getLogger(CommitAwareOperator.class);
   }
 
 }