You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:08 UTC
[37/50] incubator-apex-core git commit: APEX-56 SPOI-4380 #resolve
Remove terminated operators from plan after window is committed.
APEX-56 SPOI-4380 #resolve Remove terminated operators from plan after window is committed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/76faf869
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/76faf869
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/76faf869
Branch: refs/heads/master
Commit: 76faf869506d004fc2f7d470f5bb89d681b470df
Parents: 3c5b88c
Author: thomas <th...@datatorrent.com>
Authored: Thu Aug 20 11:06:33 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Thu Aug 20 11:06:33 2015 -0700
----------------------------------------------------------------------
.../stram/StreamingContainerManager.java | 46 +++++++----
.../stram/plan/physical/PhysicalPlan.java | 32 +++++---
.../com/datatorrent/stram/MockContainer.java | 2 +-
.../com/datatorrent/stram/StreamCodecTest.java | 35 +--------
.../stram/StreamingContainerManagerTest.java | 81 +++++++++++++++++++-
5 files changed, 134 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 6e0f3f5..eed2948 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -154,6 +154,7 @@ public class StreamingContainerManager implements PlanContext
private long lastResourceRequest = 0;
private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>();
private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>();
+ private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<>();
private CriticalPathInfo criticalPathInfo;
private final ConcurrentMap<PTOperator, PTOperator> reportStats = Maps.newConcurrentMap();
private final AtomicBoolean deployChangeInProgress = new AtomicBoolean();
@@ -1003,6 +1004,26 @@ public class StreamingContainerManager implements PlanContext
}
reportStats.remove(o);
}
+
+ if (!this.shutdownOperators.isEmpty()) {
+ synchronized (this.shutdownOperators) {
+ Iterator<Map.Entry<Long, Set<PTOperator>>> it = shutdownOperators.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Long, Set<PTOperator>> windowAndOpers = it.next();
+ if (windowAndOpers.getKey().longValue() > this.committedWindowId) {
+ // wait until window is committed
+ continue;
+ } else {
+ LOG.info("Removing inactive operators at window {} {}", Codec.getStringWindowId(windowAndOpers.getKey()), windowAndOpers.getValue());
+ for (PTOperator oper : windowAndOpers.getValue()) {
+ plan.removeTerminatedPartition(oper);
+ }
+ it.remove();
+ }
+ }
+ }
+ }
+
if (!eventQueue.isEmpty()) {
for (PTOperator oper : plan.getAllOperators().values()) {
if (oper.getState() != PTOperator.State.ACTIVE) {
@@ -1274,20 +1295,19 @@ public class StreamingContainerManager implements PlanContext
else {
switch (ds) {
case SHUTDOWN:
- // remove the operator from the plan
- Runnable r = new Runnable()
- {
- @Override
- public void run()
- {
- if (oper.getInputs().isEmpty()) {
- LOG.info("Removing IDLE operator from plan {}", oper);
- plan.removeIdlePartition(oper);
- }
+ // schedule operator deactivation against the windowId
+ // will be processed once window is committed and all dependent operators completed processing
+ long windowId = oper.stats.currentWindowId.get();
+ if (ohb.windowStats != null && !ohb.windowStats.isEmpty()) {
+ windowId = ohb.windowStats.get(ohb.windowStats.size()-1).windowId;
+ }
+ LOG.debug("Operator {} deactivated at window {}", oper, windowId);
+ synchronized (this.shutdownOperators) {
+ Set<PTOperator> deactivatedOpers = this.shutdownOperators.get(windowId);
+ if (deactivatedOpers == null) {
+ this.shutdownOperators.put(windowId, deactivatedOpers = Sets.newHashSet(oper));
}
-
- };
- dispatch(r);
+ }
sca.undeployOpers.add(oper.getId());
// record operator stop event
recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 5b90c04..a57a248 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -780,7 +780,8 @@ public class PhysicalPlan implements Serializable
partitioner.partitioned(mainPC.operatorIdToPartition);
}
- private void updateStreamMappings(PMapping m) {
+ private void updateStreamMappings(PMapping m)
+ {
for (Map.Entry<OutputPortMeta, StreamMeta> opm : m.logicalOperator.getOutputStreams().entrySet()) {
StreamMapping ug = m.outputStreams.get(opm.getKey());
if (ug == null) {
@@ -789,7 +790,6 @@ public class PhysicalPlan implements Serializable
}
LOG.debug("update stream mapping for {} {}", opm.getKey().getOperatorMeta(), opm.getKey().getPortName());
ug.setSources(m.partitions);
- //ug.redoMapping();
}
for (Map.Entry<InputPortMeta, StreamMeta> ipm : m.logicalOperator.getInputStreams().entrySet()) {
@@ -847,7 +847,6 @@ public class PhysicalPlan implements Serializable
}
LOG.debug("update upstream stream mapping for {} {}", sourceMapping.logicalOperator, ipm.getValue().getSource().getPortName());
ug.setSources(sourceMapping.partitions);
- //ug.redoMapping();
}
}
@@ -990,18 +989,30 @@ public class PhysicalPlan implements Serializable
}
/**
- * Remove a partition that was reported as idle by the execution layer.
- * Since the end stream tuple is propagated to the downstream operators,
- * there is no need to undeploy/redeploy them as part of this operation.
+ * Remove a partition that was reported as terminated by the execution layer.
+ * Recursively removes all downstream operators with no remaining input.
* @param p
*/
- public void removeIdlePartition(PTOperator p)
+ public void removeTerminatedPartition(PTOperator p)
{
+ // keep track of downstream operators for cascading remove
+ Set<PTOperator> downstreamOpers = new HashSet<>(p.outputs.size());
+ for (PTOutput out : p.outputs) {
+ for (PTInput sinkIn : out.sinks) {
+ downstreamOpers.add(sinkIn.target);
+ }
+ }
PMapping currentMapping = this.logicalToPTOperator.get(p.operatorMeta);
List<PTOperator> copyPartitions = Lists.newArrayList(currentMapping.partitions);
copyPartitions.remove(p);
removePartition(p, currentMapping);
currentMapping.partitions = copyPartitions;
+ // remove orphaned downstream operators
+ for (PTOperator dop : downstreamOpers) {
+ if (dop.inputs.isEmpty()) {
+ removeTerminatedPartition(dop);
+ }
+ }
deployChanges();
}
@@ -1012,8 +1023,8 @@ public class PhysicalPlan implements Serializable
* @param oper
* @return
*/
- private void removePartition(PTOperator oper, PMapping operatorMapping) {
-
+ private void removePartition(PTOperator oper, PMapping operatorMapping)
+ {
// remove any parallel partition
for (PTOutput out : oper.outputs) {
// copy list as it is modified by recursive remove
@@ -1137,7 +1148,8 @@ public class PhysicalPlan implements Serializable
return inputPortList;
}
- void removePTOperator(PTOperator oper) {
+ void removePTOperator(PTOperator oper)
+ {
LOG.debug("Removing operator " + oper);
// per partition merge operators
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/test/java/com/datatorrent/stram/MockContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/MockContainer.java b/engine/src/test/java/com/datatorrent/stram/MockContainer.java
index 7a6ba64..c0b704f 100644
--- a/engine/src/test/java/com/datatorrent/stram/MockContainer.java
+++ b/engine/src/test/java/com/datatorrent/stram/MockContainer.java
@@ -91,7 +91,7 @@ public class MockContainer
for (Map.Entry<Integer, MockOperatorStats> oe : this.stats.entrySet()) {
OperatorHeartbeat ohb = new OperatorHeartbeat();
ohb.setNodeId(oe.getKey());
- ohb.setState(OperatorHeartbeat.DeployState.ACTIVE);
+ ohb.setState(oe.getValue().deployState);
OperatorStats lstats = new OperatorStats();
lstats.checkpoint = new Checkpoint(oe.getValue().checkpointWindowId, 0, 0);
lstats.windowId = oe.getValue().currentWindowId;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 9726e65..d7a7fff 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1178,28 +1178,6 @@ public class StreamCodecTest
return unifiers;
}
- private void checkNotSetStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id,
- Integer streamCodecIdentifier)
- {
- StreamCodec<?> streamCodecInfo = streamCodecs.get(streamCodecIdentifier);
- Assert.assertNotNull("stream codec null " + id, streamCodecInfo);
- Assert.assertNull("stream codec object not null " + id, streamCodecInfo);
- }
-
- private void checkStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id,
- Integer streamCodecIdentifier, StreamCodec<?> streamCodec)
- {
- checkStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodec, null);
- }
-
- private void checkStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id,
- Integer streamCodecIdentifier, StreamCodec<?> streamCodec, String className)
- {
- StreamCodec<?> streamCodecInfo = streamCodecs.get(streamCodecIdentifier);
- Assert.assertNotNull("stream codec info null " + id, streamCodecInfo);
- Assert.assertEquals("stream codec object " + id, streamCodec, streamCodecInfo);
- }
-
private void checkPresentStreamCodec(LogicalPlan.OperatorMeta operatorMeta, Operator.InputPort<?> inputPort,
Map<Integer, StreamCodec<?>> streamCodecs,
String id, PhysicalPlan plan )
@@ -1277,17 +1255,6 @@ public class StreamCodecTest
return otdi;
}
- private LogicalPlan.InputPortMeta getInputPortMeta(LogicalPlan.StreamMeta streamMeta, LogicalPlan.OperatorMeta operatorMeta)
- {
- LogicalPlan.InputPortMeta portMeta = null;
- for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : operatorMeta.getInputStreams().entrySet()) {
- if (entry.getValue() == streamMeta) {
- portMeta = entry.getKey();
- }
- }
- return portMeta;
- }
-
// For tests so that it doesn't trigger assignment of a new id
public boolean isStrCodecPresent(StreamCodec<?> streamCodecInfo, PhysicalPlan plan)
{
@@ -1316,7 +1283,7 @@ public class StreamCodecTest
public static class DefaultTestStreamCodec extends DefaultStatefulStreamCodec<Object> implements Serializable
{
-
+ private static final long serialVersionUID = 1L;
}
public static class DefaultCodecOperator extends GenericTestOperator
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index a238e3e..89f2878 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -31,7 +31,6 @@ import org.junit.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
@@ -40,10 +39,10 @@ import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.api.Stats.OperatorStats.PortStats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.Stateless;
-
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.stram.MockContainer.MockOperatorStats;
import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
import com.datatorrent.stram.api.AppDataSource;
@@ -56,6 +55,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHe
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState;
import com.datatorrent.stram.appdata.AppDataPushAgent;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.engine.*;
@@ -72,12 +72,14 @@ import com.datatorrent.stram.support.StramTestSupport.EmbeddedWebSocketServer;
import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
import com.datatorrent.stram.support.StramTestSupport.TestMeta;
import com.datatorrent.stram.tuple.Tuple;
+
import org.apache.commons.lang.StringUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.eclipse.jetty.websocket.WebSocket;
-public class StreamingContainerManagerTest {
+public class StreamingContainerManagerTest
+{
@Rule public TestMeta testMeta = new TestMeta();
@Test
@@ -703,6 +705,74 @@ public class StreamingContainerManagerTest {
Assert.assertEquals("type " + o1DeployInfo, OperatorDeployInfo.OperatorType.INPUT, o1DeployInfo.type);
}
+ @Test
+ public void testOperatorShutdown()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+ GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+ dag.addStream("stream1", o1.outport1, o2.inport1);
+
+ StreamingContainerManager scm = new StreamingContainerManager(dag);
+
+ PhysicalPlan physicalPlan = scm.getPhysicalPlan();
+ Map<PTContainer, MockContainer> mockContainers = new HashMap<>();
+ for (PTContainer c : physicalPlan.getContainers()) {
+ MockContainer mc = new MockContainer(scm, c);
+ mockContainers.put(c, mc);
+ }
+ // deploy all containers
+ for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
+ ce.getValue().deploy();
+ // skip buffer server purge in monitorHeartbeat
+ ce.getKey().bufferServerAddress = null;
+ }
+
+ PTOperator o1p1 = physicalPlan.getOperators(dag.getMeta(o1)).get(0);
+ MockContainer mc1 = mockContainers.get(o1p1.getContainer());
+ MockOperatorStats o1p1mos = mc1.stats(o1p1.getId());
+ o1p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+ mc1.sendHeartbeat();
+
+ PTOperator o2p1 = physicalPlan.getOperators(dag.getMeta(o2)).get(0);
+ MockContainer mc2 = mockContainers.get(o2p1.getContainer());
+ MockOperatorStats o2p1mos = mc2.stats(o2p1.getId());
+ o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+ mc2.sendHeartbeat();
+
+ o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
+ mc1.sendHeartbeat();
+ scm.monitorHeartbeat();
+ Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId());
+ scm.monitorHeartbeat(); // committedWindowId updated in next cycle
+ Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
+ scm.processEvents();
+ Assert.assertEquals("containers at committedWindowId=1", 2, physicalPlan.getContainers().size());
+
+ // checkpoint window 2
+ o1p1mos.checkpointWindowId(2);
+ mc1.sendHeartbeat();
+ scm.monitorHeartbeat();
+
+ o2p1mos.currentWindowId(2).checkpointWindowId(2);
+ mc2.sendHeartbeat();
+ scm.monitorHeartbeat();
+ Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
+ scm.monitorHeartbeat(); // committedWindowId updated in next cycle
+ Assert.assertEquals("committedWindowId", 2, scm.getCommittedWindowId());
+ Assert.assertEquals(1, o1p1.getContainer().getOperators().size());
+ Assert.assertEquals(1, o2p1.getContainer().getOperators().size());
+ Assert.assertEquals(2, physicalPlan.getContainers().size());
+
+ // call again as events are processed after committed window was updated
+ scm.processEvents();
+ Assert.assertEquals(0, o1p1.getContainer().getOperators().size());
+ Assert.assertEquals(0, o2p1.getContainer().getOperators().size());
+ Assert.assertEquals(0, physicalPlan.getContainers().size());
+ }
private void testDownStreamPartition(Locality locality) throws Exception
{
@@ -738,7 +808,8 @@ public class StreamingContainerManagerTest {
}
@Test
- public void testPhysicalPropertyUpdate() throws Exception{
+ public void testPhysicalPropertyUpdate() throws Exception
+ {
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
@@ -755,6 +826,7 @@ public class StreamingContainerManagerTest {
Future<?> future = dnmgr.getPhysicalOperatorProperty(lc.getPlanOperators(dag.getMeta(o1)).get(0).getId(), "maxTuples", 10000);
Object object = future.get(10000, TimeUnit.MILLISECONDS);
Assert.assertNotNull(object);
+ @SuppressWarnings("unchecked")
Map<String, Object> propertyValue = (Map<String, Object>)object;
Assert.assertEquals(2,propertyValue.get("maxTuples"));
lc.shutdown();
@@ -873,6 +945,7 @@ public class StreamingContainerManagerTest {
pushAgent.pushData();
Thread.sleep(1000);
Assert.assertTrue(messages.size() > 0);
+ pushAgent.close();
JSONObject message = messages.get(0);
System.out.println("Got this message: " + message.toString(2));
Assert.assertEquals(topic, message.getString("topic"));