You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/01/21 02:32:48 UTC
apex-core git commit: APEXCORE-596 Setting the thread for all oio
nodes in the oio group, refactoring tests
Repository: apex-core
Updated Branches:
refs/heads/master 7ea7f6073 -> a469dfb22
APEXCORE-596 Setting the thread for all oio nodes in the oio group, refactoring tests
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a469dfb2
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a469dfb2
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a469dfb2
Branch: refs/heads/master
Commit: a469dfb229c0d42c07048d95d070703331e2c429
Parents: 7ea7f60
Author: francisf <fr...@gmail.com>
Authored: Thu Jan 5 13:02:05 2017 +0530
Committer: francisf <fr...@gmail.com>
Committed: Fri Jan 20 22:51:21 2017 +0530
----------------------------------------------------------------------
.../stram/engine/StreamingContainer.java | 7 +++
.../stram/engine/StreamingContainerTest.java | 66 +++++++++++++++-----
2 files changed, 58 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a469dfb2/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 78f3421..86c0402 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -1460,6 +1460,13 @@ public class StreamingContainer extends YarnContainerMain
}
};
node.context.setThread(thread);
+ List<Integer> oioNodeIdList = oioGroups.get(ndi.id);
+ if (oioNodeIdList != null) {
+ for (Integer oioNodeId : oioNodeIdList) {
+ Node<?> oioNode = nodes.get(oioNodeId);
+ oioNode.context.setThread(thread);
+ }
+ }
thread.start();
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a469dfb2/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
index 18aee4c..451972e 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
@@ -20,8 +20,9 @@ package com.datatorrent.stram.engine;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
@@ -30,8 +31,13 @@ import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
@@ -42,6 +48,9 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
*/
public class StreamingContainerTest
{
+ private static final Logger logger = LoggerFactory.getLogger(StreamingContainerTest.class);
+ private static Set<String> committedWindowIds = Collections.synchronizedSet(new HashSet<String>());
+ private static Set<String> checkpointedWindowIds = Collections.synchronizedSet(new HashSet<String>());
@Test
public void testCommitted() throws IOException, ClassNotFoundException
@@ -50,40 +59,68 @@ public class StreamingContainerTest
String workingDir = new File("target/testCommitted").getAbsolutePath();
lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
- CommitAwareOperator operator = lp.addOperator("CommitAwareOperator", new CommitAwareOperator());
+ String opName = "CommitAwareOperatorTestCommit";
+ lp.addOperator(opName, new CommitAwareOperator());
- List<Long> myCommittedWindowIds = CommitAwareOperator.getCommittedWindowIdsContainer();
+ StramLocalCluster lc = new StramLocalCluster(lp);
+ lc.run(5000);
+
+ /* this is not foolproof but some insurance is better than nothing */
+ Assert.assertTrue("No Committed Windows", committedWindowIds.contains(opName));
+ }
+
+ @Test
+ public void testOiOCommitted() throws IOException, ClassNotFoundException
+ {
+ LogicalPlan lp = new LogicalPlan();
+ String workingDir = new File("target/testCommitted").getAbsolutePath();
+ lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
+ lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
+ String op1Name = "CommitAwareOperatorTestOioCommit1";
+ String op2Name = "CommitAwareOperatorTestOioCommit2";
+ CommitAwareOperator operator1 = lp.addOperator(op1Name, new CommitAwareOperator());
+ CommitAwareOperator operator2 = lp.addOperator(op2Name, new CommitAwareOperator());
+ lp.addStream("local", operator1.output, operator2.input).setLocality(Locality.THREAD_LOCAL);
StramLocalCluster lc = new StramLocalCluster(lp);
lc.run(5000);
/* this is not foolproof but some insurance is better than nothing */
- Assert.assertSame("Concurrent Use detected", myCommittedWindowIds, CommitAwareOperator.committedWindowIds);
- Assert.assertFalse("No Committed Windows", myCommittedWindowIds.isEmpty());
+ Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op1Name));
+ Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op2Name));
}
private static class CommitAwareOperator extends BaseOperator implements CheckpointListener, InputOperator
{
- public static ArrayList<Long> committedWindowIds;
- public static ArrayList<Long> checkpointedWindowIds = new ArrayList<Long>();
+ private transient String name;
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
- public static final synchronized List<Long> getCommittedWindowIdsContainer()
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String tuple)
+ {
+ }
+ };
+ @Override
+ public void setup(OperatorContext context)
{
- return committedWindowIds = new ArrayList<Long>();
+ this.name = context.getName();
}
@Override
public void checkpointed(long windowId)
{
- checkpointedWindowIds.add(windowId);
- logger.debug("checkpointed {}", windowId);
+ checkpointedWindowIds.add(name);
+ logger.debug("checkpointed {} {}", name, windowId);
}
@Override
public void committed(long windowId)
{
- committedWindowIds.add(windowId);
- logger.debug("committed {}", windowId);
+ committedWindowIds.add(name);
+ logger.debug("committed {} {}", name, windowId);
}
@Override
@@ -91,7 +128,6 @@ public class StreamingContainerTest
{
}
- private static final Logger logger = LoggerFactory.getLogger(CommitAwareOperator.class);
}
}