You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/08/16 19:43:21 UTC
[1/2] apex-core git commit: APEXCORE-448 made operator name available
in the operator context
Repository: apex-core
Updated Branches:
refs/heads/master a42c67bc2 -> 3119aba89
APEXCORE-448 made operator name available in the operator context
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9bcde2e3
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9bcde2e3
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9bcde2e3
Branch: refs/heads/master
Commit: 9bcde2e306ce7f6c3199112ad5bb737370f6624e
Parents: 130ce6b
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Aug 5 21:19:22 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Aug 5 21:19:22 2016 -0700
----------------------------------------------------------------------
.../main/java/com/datatorrent/api/Context.java | 5 ++
.../stram/api/OperatorDeployInfo.java | 6 ++
.../stram/engine/OperatorContext.java | 15 +++-
.../stram/engine/StreamingContainer.java | 5 +-
.../stram/engine/AtMostOnceTest.java | 3 +-
.../stram/engine/GenericNodeTest.java | 14 ++--
.../datatorrent/stram/engine/InputNodeTest.java | 2 +-
.../com/datatorrent/stram/engine/NodeTest.java | 13 ++--
.../stram/engine/OperatorContextTest.java | 79 ++++++++++++++++++++
.../stram/engine/ProcessingModeTests.java | 2 +-
.../stram/stream/InlineStreamTest.java | 6 +-
11 files changed, 130 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index a0f3ad3..187bf08 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -323,6 +323,11 @@ public interface Context
int getId();
/**
+ * @return the logical operator name which was used to add the operator in tha DAG.
+ */
+ String getName();
+
+ /**
* Return the number of windows before the next checkpoint including the current window.
* @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
*/
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
index bccbedf..219017b 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
@@ -52,6 +52,12 @@ public class OperatorDeployInfo implements Serializable, OperatorContext
}
@Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
public AttributeMap getAttributes()
{
return contextAttributes;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
index 424ffcc..7113280 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
@@ -21,9 +21,13 @@ package com.datatorrent.stram.engine;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
+import javax.validation.constraints.NotNull;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Context;
import com.datatorrent.api.StatsListener.OperatorRequest;
@@ -44,6 +48,7 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont
private Thread thread;
private long lastProcessedWindowId;
private final int id;
+ private final String name;
// the size of the circular queue should be configurable. hardcoded to 1024 for now.
private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024);
private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024);
@@ -81,14 +86,16 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont
/**
*
* @param id the value of id
+ * @param name name of the operator
* @param attributes the value of attributes
* @param parentContext
*/
- public OperatorContext(int id, AttributeMap attributes, Context parentContext)
+ public OperatorContext(int id, @NotNull String name, AttributeMap attributes, Context parentContext)
{
super(attributes, parentContext);
this.lastProcessedWindowId = Stateless.WINDOW_ID;
this.id = id;
+ this.name = Preconditions.checkNotNull(name, "operator name");
this.stateless = super.getValue(OperatorContext.STATELESS);
}
@@ -99,6 +106,12 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont
}
@Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
public int getWindowsFromCheckpoint()
{
return windowsFromCheckpoint;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 54b8a6e..27688e3 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -902,14 +902,15 @@ public class StreamingContainer extends YarnContainerMain
Context parentContext;
if (ndi instanceof UnifierDeployInfo) {
- OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
+ OperatorContext unifiedOperatorContext = new OperatorContext(0, ndi.name,
+ ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
massageUnifierDeployInfo(ndi);
} else {
parentContext = containerContext;
}
- OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
+ OperatorContext ctx = new OperatorContext(ndi.id, ndi.name, ndi.contextAttributes, parentContext);
ctx.attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, ndi.checkpoint.windowId);
logger.debug("Restoring operator {} to checkpoint {} stateless={}.", ndi.id, Codec.getStringWindowId(ndi.checkpoint.windowId), ctx.stateless);
Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index b1296c8..cc777f7 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -83,7 +83,8 @@ public class AtMostOnceTest extends ProcessingModeTests
map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
map.put(OperatorContext.PROCESSING_MODE, processingMode);
- final GenericNode node = new GenericNode(new MultiInputOperator(), new com.datatorrent.stram.engine.OperatorContext(1, map, null));
+ final GenericNode node = new GenericNode(new MultiInputOperator(),
+ new com.datatorrent.stram.engine.OperatorContext(1, "operator", map, null));
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 1024);
AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 1024);
node.connectInputPort("input1", reservoir1);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index b7f6362..5dfa5f3 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -145,9 +145,9 @@ public class GenericNodeTest
public volatile List<Checkpoint> checkpoints = Lists.newArrayList();
- public TestStatsOperatorContext(int id, AttributeMap attributes, Context parentContext)
+ public TestStatsOperatorContext(int id, String name, AttributeMap attributes, Context parentContext)
{
- super(id, attributes, parentContext);
+ super(id, name, attributes, parentContext);
}
@Override
@@ -276,7 +276,8 @@ public class GenericNodeTest
long sleeptime = 25L;
final ArrayList<Object> list = new ArrayList<Object>();
GenericOperator go = new GenericOperator();
- final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null));
+ final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+ new DefaultAttributeMap(), null));
gn.setId(1);
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
@@ -397,7 +398,8 @@ public class GenericNodeTest
long maxSleep = 5000;
long sleeptime = 25L;
GenericOperator go = new GenericOperator();
- final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null));
+ final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+ new DefaultAttributeMap(), null));
gn.setId(1);
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
@@ -537,7 +539,7 @@ public class GenericNodeTest
dam.put(OperatorContext.STORAGE_AGENT, storageAgent);
- TestStatsOperatorContext operatorContext = new TestStatsOperatorContext(0, dam, null);
+ TestStatsOperatorContext operatorContext = new TestStatsOperatorContext(0, "operator", dam, null);
final GenericNode gn = new GenericNode(go, operatorContext);
gn.setId(1);
@@ -635,7 +637,7 @@ public class GenericNodeTest
DefaultAttributeMap attrMap = new DefaultAttributeMap();
attrMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, dagCheckPoint);
attrMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, opCheckPoint);
- final OperatorContext context = new com.datatorrent.stram.engine.OperatorContext(0, attrMap, null);
+ final OperatorContext context = new com.datatorrent.stram.engine.OperatorContext(0, "operator", attrMap, null);
final GenericNode gn = new GenericNode(go, context);
gn.setId(1);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
index 1c51abb..e182b75 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
@@ -69,7 +69,7 @@ public class InputNodeTest
dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 10);
dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 10);
- final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+ final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null));
in.setId(1);
TestSink testSink = new TestSink();
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index 48a819f..26bd7a0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -206,7 +206,7 @@ public class NodeTest
attributeMap.put(OperatorContext.STORAGE_AGENT, new StorageAgentImpl());
attributeMap.put(OperatorContext.STATELESS, true);
Node<StatelessOperator> node = new Node<StatelessOperator>(new StatelessOperator(),
- new com.datatorrent.stram.engine.OperatorContext(0, attributeMap, null))
+ new com.datatorrent.stram.engine.OperatorContext(0,"operator", attributeMap, null))
{
@Override
public void connectInputPort(String port, SweepableReservoir reservoir)
@@ -239,18 +239,18 @@ public class NodeTest
DefaultAttributeMap attributeMap = new DefaultAttributeMap();
attributeMap.put(OperatorContext.STORAGE_AGENT, new StorageAgentImpl());
Node<TestGenericOperator> node = new Node<TestGenericOperator>(new TestGenericOperator(),
- new com.datatorrent.stram.engine.OperatorContext(0, attributeMap, null))
+ new com.datatorrent.stram.engine.OperatorContext(0, "operator", attributeMap, null))
{
@Override
public void connectInputPort(String port, SweepableReservoir reservoir)
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void run()
{
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
}
};
@@ -292,9 +292,10 @@ public class NodeTest
final Node in;
if (trueGenericFalseInput) {
- in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+ in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null));
} else {
- in = new InputNode((InputCheckpointOperator)gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+ in = new InputNode((InputCheckpointOperator)gco, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+ dam, null));
}
in.setId(1);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java b/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java
new file mode 100644
index 0000000..aa48bbe
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.engine;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Tests for {@link OperatorContext}
+ */
+public class OperatorContextTest
+{
+ private static CountDownLatch latch = new CountDownLatch(1);
+ private static volatile String operatorName;
+
+ private static class MockInputOperator extends BaseOperator implements InputOperator
+ {
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
+ latch.countDown();
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ }
+ }
+
+ @Test
+ public void testInjectionOfOperatorName() throws Exception
+ {
+ StreamingApplication application = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ dag.addOperator("input", new MockInputOperator());
+ }
+ };
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(application, new Configuration());
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ latch.await();
+ Assert.assertEquals("operator name", "input", operatorName);
+ lc.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 55d8d5f..3e208c8 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -261,7 +261,7 @@ public class ProcessingModeTests
map.put(OperatorContext.PROCESSING_MODE, processingMode);
final GenericNode node = new GenericNode(new MultiInputOperator(),
- new com.datatorrent.stram.engine.OperatorContext(1, map, null));
+ new com.datatorrent.stram.engine.OperatorContext(1, "operator", map, null));
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 1024);
AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 1024);
node.connectInputPort("input1", reservoir1);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
index ef63a62..ed145c7 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
@@ -56,12 +56,14 @@ public class InlineStreamTest
final int totalTupleCount = 5000;
final PassThroughNode<Object> operator1 = new PassThroughNode<Object>();
- final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, new DefaultAttributeMap(), null));
+ final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, "operator1", new DefaultAttributeMap(),
+ null));
node1.setId(1);
operator1.setup(node1.context);
final PassThroughNode<Object> operator2 = new PassThroughNode<Object>();
- final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, new DefaultAttributeMap(), null));
+ final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, "operator2", new DefaultAttributeMap(),
+ null));
node2.setId(2);
operator2.setup(node2.context);
[2/2] apex-core git commit: Merge branch 'APEXCORE-448' of
https://github.com/chandnisingh/incubator-apex-core into APEXCORE-448
Posted by vr...@apache.org.
Merge branch 'APEXCORE-448' of https://github.com/chandnisingh/incubator-apex-core into APEXCORE-448
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3119aba8
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3119aba8
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3119aba8
Branch: refs/heads/master
Commit: 3119aba891676d936c76cd36acf5fb6833547a13
Parents: a42c67b 9bcde2e
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Tue Aug 16 12:42:02 2016 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Aug 16 12:42:02 2016 -0700
----------------------------------------------------------------------
.../main/java/com/datatorrent/api/Context.java | 5 ++
.../stram/api/OperatorDeployInfo.java | 6 ++
.../stram/engine/OperatorContext.java | 15 +++-
.../stram/engine/StreamingContainer.java | 5 +-
.../stram/engine/AtMostOnceTest.java | 3 +-
.../stram/engine/GenericNodeTest.java | 14 ++--
.../datatorrent/stram/engine/InputNodeTest.java | 2 +-
.../com/datatorrent/stram/engine/NodeTest.java | 13 ++--
.../stram/engine/OperatorContextTest.java | 79 ++++++++++++++++++++
.../stram/engine/ProcessingModeTests.java | 2 +-
.../stram/stream/InlineStreamTest.java | 6 +-
11 files changed, 130 insertions(+), 20 deletions(-)
----------------------------------------------------------------------