You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by chandnisingh <gi...@git.apache.org> on 2016/07/30 07:31:16 UTC

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

GitHub user chandnisingh opened a pull request:

    https://github.com/apache/apex-core/pull/364

    APEXCORE-448 Made operator name available in operator context

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/chandnisingh/incubator-apex-core APEXCORE-448

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/apex-core/pull/364.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #364
    
----
commit 548598a110b93bd3f467292d55a257ce049ccd72
Author: Chandni Singh <cs...@apache.org>
Date:   2016-07-30T07:26:31Z

    APEXCORE-448 Made operator name available in operator context

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73448524
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    This has been changed. Please look at the latest changes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73066751
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -328,6 +328,8 @@
          */
         int getWindowsFromCheckpoint();
     
    +    String getOperatorName();
    --- End diff --
    
    In the Jira, @PramodSSImmaneni suggested to have ```getOperatorName()``` which is more specific. I agree with that. It makes it explicit. 
    However we can call it ```getName``` and explain in the javadoc which I realize is missing. 
    I don't have a strong opinion on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73437878
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    Check for OperatorDeployInfo not null is optional as it will be necessary to dereference OperatorDeployInfo.id and OperatorDeployInfo.name in the OperatorContext constructor, so NullPointerException will be raised anyway. Annotating OperatorDeployInfo parameter as `@NotNull` is good, but we don't follow this practice in other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73439463
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    I agree with @vrozov that implementing aggregator is not required.
    I think the better way would to be define `latch` as static object of the OperatorContextTest and use that in setup.. so no need to implement Aggregator..
    
    ```java
    public class OperatorContextTest
    {
    
      private static final CountDownLatch latch = new CountDownLatch(1);
    
      @Test
      public void testInjectionOfOperatorName() throws Exception
      {
        final LocalMode lma = LocalMode.newInstance();
    
        StreamingApplication testApp = new StreamingApplication()
        {
          @Override
          public void populateDAG(DAG dag, Configuration conf)
          {
            dag.addOperator("input", new MockInputOperator());
          }
        };
    
        lma.prepareDAG(testApp, new Configuration());
        LocalMode.Controller lc = lma.getController();
        lc.runAsync();
        latch.await();
        lc.shutdown();
      }
    
      private static class MockInputOperator extends BaseOperator implements InputOperator
      {
    
        @Override
        public void setup(Context.OperatorContext context)
        {
          String operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
          Assert.assertEquals("operator name", "input", operatorName);
          latch.countDown();
        }
    
        @Override
        public void emitTuples()
        {
        }
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73450445
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
    @@ -902,14 +902,14 @@ private void deployNodes(List<OperatorDeployInfo> nodeList) throws IOException
     
           Context parentContext;
           if (ndi instanceof UnifierDeployInfo) {
    -        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
    +        OperatorContext unifiedOperatorContext = new OperatorContext(ndi, containerContext);
             parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
             massageUnifierDeployInfo(ndi);
           } else {
             parentContext = containerContext;
           }
     
    -      OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
    +      OperatorContext ctx = new OperatorContext(ndi, parentContext);
    --- End diff --
    
    OperatorContext constructor will use `((OperatorDeployInfo.UnifierDeployInfo)operatorDeployInfo).operatorAttributes` when ndi is instanse of OperatorDeployInfo.UnifierDeployInfo. The old code will use `ndi.contextAttributes`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73281443
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
    --- End diff --
    
    A lot of tests create OperatorContext instance without OperatorDeployInfo. 
    OperatorDeployInfo does not expose methods to set the fields - id, name, attributes.
    So if we change this it will cause lot more changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh closed the pull request at:

    https://github.com/apache/apex-core/pull/364


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    @vrozov @PramodSSImmaneni 
    Please have another look


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73455000
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
    @@ -902,14 +902,14 @@ private void deployNodes(List<OperatorDeployInfo> nodeList) throws IOException
     
           Context parentContext;
           if (ndi instanceof UnifierDeployInfo) {
    -        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
    +        OperatorContext unifiedOperatorContext = new OperatorContext(ndi, containerContext);
             parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
             massageUnifierDeployInfo(ndi);
           } else {
             parentContext = containerContext;
           }
     
    -      OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
    +      OperatorContext ctx = new OperatorContext(ndi, parentContext);
    --- End diff --
    
    Old code behavior:
    ```
          if (ndi instanceof UnifierDeployInfo) {
            OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
          } else {
            parentContext = containerContext;
          }
    
          OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
    
    ``` 
    In the new code, the constructor handles which attributes to use.
    ``` 
      public OperatorContext(@NotNull OperatorDeployInfo operatorDeployInfo, Context parentContext)
      {
        super(operatorDeployInfo instanceof OperatorDeployInfo.UnifierDeployInfo ?
            ((OperatorDeployInfo.UnifierDeployInfo)operatorDeployInfo).operatorAttributes : operatorDeployInfo.contextAttributes,
            parentContext);
      }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73065873
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -328,6 +328,8 @@
          */
         int getWindowsFromCheckpoint();
     
    +    String getOperatorName();
    --- End diff --
    
    To be consistent with `getId()` method, should it be `getName()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    Oh my I thought this was a simple change :) 
    Changing the ```OperatorContext(...)``` signature. Will update once that is done



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73448985
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    Is this code needed to test the feature being test.. No.. So adding redundant code to test case is definitely is not a good idea...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73433304
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    I will remove the Precondition as @PramodSSImmaneni pointed out. 
    However IMO the should be a separate topic. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73454565
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    Assertion on the name is happening in the test. There is no need to assert twice since we are avoiding redundancy in the tests. The test has a timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73450835
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    IMO nitpicking on test code to such an extent is an extremely bad idea.  However,  I will make the change because I need this feature soon 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    As far as I can see not all changes are correct (please see my comments regarding UnifierDeployInfo and access from two different threads to the operatorName). It will be good to have them fixed and the PR merged. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73432473
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String name, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.name = Preconditions.checkNotNull(name, "operator name");
    --- End diff --
    
    yup it can't be null. will remove this 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73448436
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    Please double check. Exception thrown in setup() should terminate the container/JVM, so the test will fail. In case it does not fail, should not Preconditions.checkNotNull() be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73066955
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    Should the condition be extended to check for empty operator name as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r72914815
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,98 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await();
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getOperatorName(), "operator name");
    +    }
    +
    +    @Override
    +    public void emitTuples()
    +    {
    +    }
    +
    +    @Override
    +    public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext> physicalMetrics)
    +    {
    +      String name = (String)physicalMetrics.iterator().next().getMetrics().get("operatorName");
    +      Assert.assertEquals("operator name", "input", name);
    --- End diff --
    
    Right now the test will hang if assertion fails so I will fix it. However will use the latch because the test needs to wait for sometime until the value is set. With just a variable the number of lines to do that increases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73433050
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    @vrozov I am running this app with a ```null``` operator name and NullPointerException is thrown:
    ```
      @ApplicationAnnotation(name = "TestApp")
      class SplitterApp implements StreamingApplication
      {
        MockReceiver receiver;
    
        @Override
        public void populateDAG(DAG dag, Configuration configuration)
        {
          dag.setAttribute(DAG.APPLICATION_PATH, baseTestMeta.dataDirectory);
          MockFileInput fileInput = dag.addOperator("Input", new MockFileInput());
          fileInput.filePaths = baseTestMeta.filePaths;
    
          FileSplitterBase splitter = dag.addOperator(null, new FileSplitterBase());
          splitter.setFile(baseTestMeta.dataDirectory);
    
          receiver = dag.addOperator("Receiver", new MockReceiver());
    
          dag.addStream("files", fileInput.files, splitter.input);
          dag.addStream("file-metadata", splitter.filesMetadataOutput, receiver.fileMetadata);
        }
      }
    java.lang.NullPointerException
    	at com.datatorrent.stram.plan.logical.LogicalPlan$OperatorMeta.equals(LogicalPlan.java:1187)
    	at java.util.ArrayList.remove(ArrayList.java:528)
    	at com.datatorrent.stram.plan.logical.LogicalPlan$StreamMeta.addSink(LogicalPlan.java:531)
    	at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:1391)
    	at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:1442)
    	at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:119)
    	at com.datatorrent.lib.io.fs.FileSplitterBaseTest$SplitterApp.populateDAG(FileSplitterBaseTest.java:231)
    	at com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.prepareDAG(LogicalPlanConfiguration.java:2226)
    	at com.datatorrent.stram.LocalModeImpl.prepareDAG(LocalModeImpl.java:67)
    	at com.datatorrent.lib.io.fs.FileSplitterBaseTest.testSplitterInApp(FileSplitterBaseTest.java:205)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73452693
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    @vrozov How is adding another output operator here makes it difficult to understand the test logic?
    I believe if the GenericOperator fails, the test will pass.
    
    IMO nitpicking to this level results in reduction of contribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73434538
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    Actually I am going to change the method as @vrozov pointed out and that will expect OperatorDeployInfo. I will have a argument check there for OperatorDeployInfo instance that it should not be null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73426924
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
    --- End diff --
    
    id, name and attributes are public fields on OperatorDeployInfo. Using OperatorDeployInfo will allow avoiding changes to the OperatorContext constructor each time a new field is introduced. There seems to be the dependency in production code between OperatorContext and OperatorDeployInfo anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73454848
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    And the Preconditions is there so the thread running the operator is killed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73434641
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String name, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.name = Preconditions.checkNotNull(name, "operator name");
    --- End diff --
    
    However, having this check here it ensures that later no classes instantiating operatorContext do not pass a null name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73445819
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    Any assertion here will not fail the test even if the assertion fails.  I will check for the operator name in the main test 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73066772
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -328,6 +328,8 @@
          */
         int getWindowsFromCheckpoint();
     
    +    String getOperatorName();
    --- End diff --
    
    The same comment applies to other introduced `operatorName` fields where `id` means operator Id, while `operatorName` is used for an operator name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73454388
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    @vrozov If the test fails on the unrelated changes, that still means that new changes are problematic and should be looked into.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73423271
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    Will not calling `countDownLatch.countDown()` in setup() solve the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    @PramodSSImmaneni please review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    @chandnisingh lgtm once unit test is updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73431751
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    I tried to call `LogicalPlan.addOperator(null, GenericTestOperator.class)` in LogicalPlanTest and no exception was raised until `LogicalPlan.validate()` was called. IMO, both `null` and empty string should be disallowed in the addOperator() itself and OperatorContext should not do additional checks. For example, OperatorMeta does not do such checks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73450177
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
    @@ -902,14 +902,14 @@ private void deployNodes(List<OperatorDeployInfo> nodeList) throws IOException
     
           Context parentContext;
           if (ndi instanceof UnifierDeployInfo) {
    -        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
    +        OperatorContext unifiedOperatorContext = new OperatorContext(ndi, containerContext);
             parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
             massageUnifierDeployInfo(ndi);
           } else {
             parentContext = containerContext;
           }
     
    -      OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
    +      OperatorContext ctx = new OperatorContext(ndi, parentContext);
    --- End diff --
    
    Please check the operator context constructor 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73453695
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    It is hard to say what is better - to have a test that may fail on unrelated changes or not to have a test at all. How long will it take to remove an extra operator? How long will it take to modify test not to rely on aggregate? It is a small test and it is not necessary to make it more complex than it needs to be.
    
    I suggest that we stop discussion on "nitpicking" and "hard time" on the PR and use it only to discuss code changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    Yes @chandnisingh please make it volatile or have some other mechanism of passing the information between threads.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73450932
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    @chandnisingh Is the latest commit pushed to remote?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    Changed the method to ```getName()```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    This pull request attracts a lot of attention... Not to spoil the party, but this looks like a rather minor change and there are many other pull requests that I would consider of much higher importance that could need such intense focus (deduper, join, windowing etc.). Assuming everyone has a lot of work to do and the change is correct, I would suggest to merge this PR?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73365188
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    Why is it necessary to use metrics? Will it be sufficient to assertNotNull in setup() method itself?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73425701
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    Do you mean that LogicalPlan.addOperator() allows empty name? Does not it also allows `null`? Is there any reason to allow `null` or empty string?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73447483
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    Does it hurt to have an output operator here in the test local application?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73429227
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String name, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.name = Preconditions.checkNotNull(name, "operator name");
    --- End diff --
    
    When can it be null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73382866
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    This guarantees that setup is called and operator name has been verified, before the test could finish. The app runs here asynchronously and this test waits for expected results with a timeout.
    
    I can run the app synchronously with a timeout but there is still a chance that it will just deploy the application and the setup on the operator is not triggered yet but the test completes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73445463
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    @gauravgopi123 +1 with minor correction: there is no need for `Preconditions.checkNotNull`, `Assert.assertEquals("operator name", "input", operatorName);` will handle `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73430475
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,98 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await();
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getOperatorName(), "operator name");
    +    }
    +
    +    @Override
    +    public void emitTuples()
    +    {
    +    }
    +
    +    @Override
    +    public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext> physicalMetrics)
    +    {
    +      String name = (String)physicalMetrics.iterator().next().getMetrics().get("operatorName");
    +      Assert.assertEquals("operator name", "input", name);
    --- End diff --
    
    Also as you rightly identified have the Assert in the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by sandeshh <gi...@git.apache.org>.
Github user sandeshh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73067721
  
    --- Diff: api/src/main/java/com/datatorrent/api/Context.java ---
    @@ -328,6 +328,8 @@
          */
         int getWindowsFromCheckpoint();
     
    +    String getOperatorName();
    --- End diff --
    
    +1 for getName. Calls are inside the classes starting with Operator*, methods do not need to contain word *operator*.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73444434
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    long startTime = System.currentTimeMillis();
    +    while (!foundOperatorName && System.currentTimeMillis() - startTime < 2000) {
    +      Thread.sleep(100);
    +    }
    +    Assert.assertTrue("operator name found", foundOperatorName);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator
    +  {
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    +      foundOperatorName = true;
    --- End diff --
    
    Assertion is missing for operator name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73452532
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    @vrozov strictly considering the code for this test case you are technically correct. But I think there is a broader issue here. Apex on the whole has very poor testing of the critical paths. A good example is checkpointing. I don't believe there is a single test which verifies that the logic which determines the checkpoint a node is restored to is correct even though the logic can easily be pulled into a class and tested independently. Cycles should be spent correcting those issues instead of optimizing a trivial test to perfection.
    
    At the end of the day software is written by people and people respond to incentives. If people are given a hard time about small tests, they will be incentivized not to write tests. And so components will remain untested. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73453349
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
    @@ -902,14 +902,14 @@ private void deployNodes(List<OperatorDeployInfo> nodeList) throws IOException
     
           Context parentContext;
           if (ndi instanceof UnifierDeployInfo) {
    -        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
    +        OperatorContext unifiedOperatorContext = new OperatorContext(ndi, containerContext);
             parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
             massageUnifierDeployInfo(ndi);
           } else {
             parentContext = containerContext;
           }
     
    -      OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
    +      OperatorContext ctx = new OperatorContext(ndi, parentContext);
    --- End diff --
    
    Note:
    the types of attributes passed to the super, is moved inside the constructor of ```OperatorContext```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73444164
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    long startTime = System.currentTimeMillis();
    +    while (!foundOperatorName && System.currentTimeMillis() - startTime < 2000) {
    +      Thread.sleep(100);
    +    }
    +    Assert.assertTrue("operator name found", foundOperatorName);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator
    +  {
    +
    +    @AutoMetric
    --- End diff --
    
    Why is this annotation needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73444389
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    For this test there is no need to add output operator... Input operator without any output port should suffice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73451447
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    @chandnisingh, @ilooner Adding extra code to test case is not a big deal as long as the test always succeeds and it is not necessary to troubleshoot a failure. Problem with unnecessary code in the test like aggregate and output operator is that it makes more difficult to understand the test logic and also introduces possible unrelated to the test case failures. What if bug is introduced into aggregate logic that affects OperatorContextTest?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73068052
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
    --- End diff --
    
    id, name and attributes seem to always come from OperatorDeployInfo. If this is the case, should OperatorContext take OperatorDeployInfo as input parameter instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73281978
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    I checked locally in an Application test that empty operator name is allowed in the DAG. I don't think we can restrict it here if adding empty operator name is allowed in the DAG.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73449937
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
    @@ -902,14 +902,14 @@ private void deployNodes(List<OperatorDeployInfo> nodeList) throws IOException
     
           Context parentContext;
           if (ndi instanceof UnifierDeployInfo) {
    -        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
    +        OperatorContext unifiedOperatorContext = new OperatorContext(ndi, containerContext);
             parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
             massageUnifierDeployInfo(ndi);
           } else {
             parentContext = containerContext;
           }
     
    -      OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
    +      OperatorContext ctx = new OperatorContext(ndi, parentContext);
    --- End diff --
    
    @chandnisingh Here `ndi` may also be an instance of `UnifierDeployInfo` in which case, the new code will use `ndi.operatorAttribute` while the old code was using `ndi.contextAttributes`. It sounds that currently only `id` and `name` can be copied from OperatorDeployInfo to OperatorContext, while attributes must be passed in the constructor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r72900235
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,98 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await();
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getOperatorName(), "operator name");
    +    }
    +
    +    @Override
    +    public void emitTuples()
    +    {
    +    }
    +
    +    @Override
    +    public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext> physicalMetrics)
    +    {
    +      String name = (String)physicalMetrics.iterator().next().getMetrics().get("operatorName");
    +      Assert.assertEquals("operator name", "input", name);
    --- End diff --
    
    Could we just set a static variable and check it in the test instead of using a latch and the assert?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    Addressed all the review comments so far


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73454311
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
    @@ -902,14 +902,14 @@ private void deployNodes(List<OperatorDeployInfo> nodeList) throws IOException
     
           Context parentContext;
           if (ndi instanceof UnifierDeployInfo) {
    -        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
    +        OperatorContext unifiedOperatorContext = new OperatorContext(ndi, containerContext);
             parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
             massageUnifierDeployInfo(ndi);
           } else {
             parentContext = containerContext;
           }
     
    -      OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
    +      OperatorContext ctx = new OperatorContext(ndi, parentContext);
    --- End diff --
    
    Please check the old code behavior. Unless I miss something, old code will use both operatorAttributes and contextAttributes with UnifierDeployInfo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73452493
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
    @@ -902,14 +902,14 @@ private void deployNodes(List<OperatorDeployInfo> nodeList) throws IOException
     
           Context parentContext;
           if (ndi instanceof UnifierDeployInfo) {
    -        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
    +        OperatorContext unifiedOperatorContext = new OperatorContext(ndi, containerContext);
             parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
             massageUnifierDeployInfo(ndi);
           } else {
             parentContext = containerContext;
           }
     
    -      OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
    +      OperatorContext ctx = new OperatorContext(ndi, parentContext);
    --- End diff --
    
    This is the constructor of OperatorContext:
    ```
      public OperatorContext(@NotNull OperatorDeployInfo operatorDeployInfo, Context parentContext)
      {
        super(operatorDeployInfo instanceof OperatorDeployInfo.UnifierDeployInfo ?
            ((OperatorDeployInfo.UnifierDeployInfo)operatorDeployInfo).operatorAttributes : operatorDeployInfo.contextAttributes,
            parentContext);
    
        this.lastProcessedWindowId = Stateless.WINDOW_ID;
        this.operatorDeployInfo = Preconditions.checkNotNull(operatorDeployInfo, "operator deploy info");
        this.stateless = super.getValue(OperatorContext.STATELESS);
      }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73425042
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    No, the instance of countdown latch is different in the running operator after deployment. So calling ```countDown()``` on it will have not effect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73427124
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    Yes LogicalPlan.addOperator allows empty name. I haven't checked ```null```. 
    I think that should be a separate discussion that why it allows empty string.
     If it allows null, then I will remove this Precondition completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73450775
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,112 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.api.OperatorDeployInfo;
    +
    +public class OperatorContextTest
    +{
    +  private static boolean foundOperatorName;
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    --- End diff --
    
    Critical catch. nice job @gauravgopi123 \U0001f44d 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    I second it, please reopen this PR and I will merge it. I think we have spent enough time on the tests and everyone's inputs have been valuable. There will be other opportunities in future to add more tests and refactor them.
    
    My only small concern is adding a dependency to OperatorDeployInfo in OperatorContext, can we completely justify that? I liked the earlier way where @chandnisingh was passing the extra name as well, null check or without works for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73454152
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    How  `Preconditions.checkNotNull` is different from `Assert.assertEquals("operator name", "input", operatorName)`? At the end both will throw an exception. If that exception does not lead to test failure, why `Preconditions.checkNotNull` is necessary?
    
    Why modifications to operatorName on operator thread will be immediately visible on the main thread?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73430406
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,98 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await();
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getOperatorName(), "operator name");
    +    }
    +
    +    @Override
    +    public void emitTuples()
    +    {
    +    }
    +
    +    @Override
    +    public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext> physicalMetrics)
    +    {
    +      String name = (String)physicalMetrics.iterator().next().getMetrics().get("operatorName");
    +      Assert.assertEquals("operator name", "input", name);
    --- End diff --
    
    Wouldn't it rely on the current local mode implementation specific that the aggregator is not getting serialized and deserialized back (like the operator). That is not a specified contract isn't it?  I think it would be safer to rely on the in-process execution nature and use static volatiles.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73453391
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    Yes, this line is still same that is why but the assignment is happening to a static variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on the issue:

    https://github.com/apache/apex-core/pull/364
  
    I have addressed the review comments so far.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73427414
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
    @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
        * @param attributes the value of attributes
        * @param parentContext
        */
    -  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
    +  public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext)
       {
         super(attributes, parentContext);
         this.lastProcessedWindowId = Stateless.WINDOW_ID;
         this.id = id;
         this.stateless = super.getValue(OperatorContext.STATELESS);
    +    this.operatorName = Preconditions.checkNotNull(operatorName, "operator name");
    --- End diff --
    
    LogicalPlan.addOperator doesn't allow null so will keep the Precondition as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---