You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by vesense <gi...@git.apache.org> on 2017/04/25 05:26:05 UTC

[GitHub] storm pull request #2089: STORM-2490: Lambda support

GitHub user vesense opened a pull request:

    https://github.com/apache/storm/pull/2089

    STORM-2490: Lambda support

    https://issues.apache.org/jira/browse/STORM-2490

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vesense/storm STORM-2490-lambda

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2089
    
----
commit 7b19d6b8a32726fb45c6095111a27e13411efd66
Author: vesense <be...@163.com>
Date:   2017-04-25T04:55:24Z

    STORM-2490: Lambda support

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2089: STORM-2490: Lambda support

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/2089
  
    @arunmahadevan Addressed your comments. Please take a look again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2089: STORM-2490: Lambda support

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2089
  
    @vesense Looks good, thanks. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113364466
  
    --- Diff: storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java ---
    @@ -0,0 +1,36 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.lambda;
    +
    +import org.apache.storm.topology.BasicOutputCollector;
    +import org.apache.storm.tuple.Tuple;
    +
    +public class LambdaBiConsumerBolt extends AbstractLambdaBolt {
    +
    +    private SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer;
    +
    +    public LambdaBiConsumerBolt(SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer) {
    --- End diff --
    
    I prefer to keep the current names. Both Consumer and BiConsumer are functional interfaces accepting parameters and returning nothing, these two bolts are corresponding processor. And users don't access them directly when writing spout/bolt by lambda.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113388244
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -353,6 +421,31 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel
         }
     
         /**
    +     * Define a new spout in this topology.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
    +     * @param supplier lambda expression that implements tuple generating for this spout
    +     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
    +     */
    +    public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier) throws IllegalArgumentException {
    +        return setSpout(id, supplier, null);
    +    }
    +
    +    /**
    +     * Define a new spout in this topology with the specified parallelism. If the spout declares
    +     * itself as non-distributed, the parallelism_hint will be ignored and only one task
    +     * will be allocated to this component.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
    +     * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster.
    +     * @param supplier lambda expression that implements tuple generating for this spout
    +     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
    +     */
    +    public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier, Number parallelism_hint) throws IllegalArgumentException {
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113388109
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -353,6 +421,31 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel
         }
     
         /**
    +     * Define a new spout in this topology.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
    +     * @param supplier lambda expression that implements tuple generating for this spout
    +     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
    +     */
    +    public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier) throws IllegalArgumentException {
    --- End diff --
    
    make it a generic method
    `public <T> SpoutDeclarer setSpout(String id, SerializableSupplier<T> supplier) throws IllegalArgumentException {`
    
    Otherwise following wont work 
    ```java
            SerializableSupplier<String> supplier = () -> UUID.randomUUID().toString();
            builder.setSpout("spout1", supplier);
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113364469
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -316,6 +322,68 @@ public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hi
         }
     
         /**
    +     * Define a new bolt in this topology. This defines a lambda basic bolt, which is a
    +     * simpler to use but more restricted kind of bolt. Basic bolts are intended
    +     * for non-aggregation processing and automate the anchoring/acking process to
    +     * achieve proper reliability in the topology.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
    +     * @param biConsumer lambda expression which is the instance of functional interface BiConsumer
    --- End diff --
    
    Good catch. Will update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113429561
  
    --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java ---
    @@ -0,0 +1,75 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.starter;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.topology.ConfigurableTopology;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.tuple.Values;
    +
    +import java.io.Serializable;
    +import java.util.UUID;
    +
    +public class LambdaTopology extends ConfigurableTopology {
    +    public static void main(String[] args) {
    +        ConfigurableTopology.start(new LambdaTopology(), args);
    +    }
    +
    +    @Override
    +    protected int run(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        // example. spout1: generate random strings
    +        // bolt1: get the first part of a string
    +        // bolt2: output the tuple
    +
    +        // NOTE: Variable used in lambda expression should be final or effectively final
    +        // (or it will cause compilation error),
    +        // and variable type should implement the Serializable interface if it isn't primitive type
    +        // (or it will cause not serializable exception).
    +        Prefix prefix = new Prefix("Hello lambda:");
    --- End diff --
    
    Any reason why this is not just a String?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2089


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113403773
  
    --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java ---
    @@ -0,0 +1,74 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.starter;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.topology.ConfigurableTopology;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.tuple.Values;
    +
    +import java.io.Serializable;
    +import java.util.UUID;
    +
    +public class LambdaTopology extends ConfigurableTopology {
    +    public static void main(String[] args) {
    +        ConfigurableTopology.start(new LambdaTopology(), args);
    +    }
    +
    +    @Override
    +    protected int run(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        // example. spout1: generate random strings
    +        // bolt1: get the first part of a string
    +        // bolt2: output the tuple
    +
    +        // NOTE: Variable used in lambda expression should be final or effectively final
    +        // (or it will cause compilation error),
    +        // and variable type should implement the Serializable interface if it isn't primitive type
    +        // (or it will cause not serializable exception).
    +        Prefix prefix = new Prefix("Hello lambda:");
    +        String suffix = ":so cool!";
    +
    +        builder.setSpout("spout1", () -> UUID.randomUUID().toString());
    +        builder.setBolt("bolt1", (tuple, collector) -> {
    +            String[] parts = tuple.getStringByField("lambda").split("\\-");
    +            collector.emit(new Values(prefix + parts[0] + suffix));
    --- End diff --
    
    I think we should support multiple fields. I added a parameter `fields` to support user defined output fields, and updated the example.
    ```
    setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, String... fields)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113394121
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -353,6 +421,31 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel
         }
     
         /**
    +     * Define a new spout in this topology.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
    +     * @param supplier lambda expression that implements tuple generating for this spout
    +     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
    +     */
    +    public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier) throws IllegalArgumentException {
    --- End diff --
    
    may be better:
    ```java 
    public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier) throws IllegalArgumentException
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113220738
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -316,6 +322,68 @@ public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hi
         }
     
         /**
    +     * Define a new bolt in this topology. This defines a lambda basic bolt, which is a
    +     * simpler to use but more restricted kind of bolt. Basic bolts are intended
    +     * for non-aggregation processing and automate the anchoring/acking process to
    +     * achieve proper reliability in the topology.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
    +     * @param biConsumer lambda expression which is the instance of functional interface BiConsumer
    --- End diff --
    
    Nitpick: This comment doesn't really say much about what the consumer is/does. Maybe something like "lambda expression that implements tuple processing for this bolt"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2089: STORM-2490: Lambda support

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/2089
  
    @srdo Addressed your comments. Please take a look again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113389047
  
    --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java ---
    @@ -0,0 +1,74 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.starter;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.topology.ConfigurableTopology;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.tuple.Values;
    +
    +import java.io.Serializable;
    +import java.util.UUID;
    +
    +public class LambdaTopology extends ConfigurableTopology {
    +    public static void main(String[] args) {
    +        ConfigurableTopology.start(new LambdaTopology(), args);
    +    }
    +
    +    @Override
    +    protected int run(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        // example. spout1: generate random strings
    +        // bolt1: get the first part of a string
    +        // bolt2: output the tuple
    +
    +        // NOTE: Variable used in lambda expression should be final or effectively final
    +        // (or it will cause compilation error),
    +        // and variable type should implement the Serializable interface if it isn't primitive type
    +        // (or it will cause not serializable exception).
    +        Prefix prefix = new Prefix("Hello lambda:");
    +        String suffix = ":so cool!";
    +
    +        builder.setSpout("spout1", () -> UUID.randomUUID().toString());
    +        builder.setBolt("bolt1", (tuple, collector) -> {
    +            String[] parts = tuple.getStringByField("lambda").split("\\-");
    +            collector.emit(new Values(prefix + parts[0] + suffix));
    --- End diff --
    
    What happens if the bolt emit multiple fields ? Explicitly call this out in the javadoc or wherever appropriate. If you expect only one field better accept a `BiFunction` than a BiConsumer and emit the return value of the function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113431656
  
    --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java ---
    @@ -0,0 +1,75 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.starter;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.topology.ConfigurableTopology;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.tuple.Values;
    +
    +import java.io.Serializable;
    +import java.util.UUID;
    +
    +public class LambdaTopology extends ConfigurableTopology {
    +    public static void main(String[] args) {
    +        ConfigurableTopology.start(new LambdaTopology(), args);
    +    }
    +
    +    @Override
    +    protected int run(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        // example. spout1: generate random strings
    +        // bolt1: get the first part of a string
    +        // bolt2: output the tuple
    +
    +        // NOTE: Variable used in lambda expression should be final or effectively final
    +        // (or it will cause compilation error),
    +        // and variable type should implement the Serializable interface if it isn't primitive type
    +        // (or it will cause not serializable exception).
    +        Prefix prefix = new Prefix("Hello lambda:");
    --- End diff --
    
    This is just an example to prove:
    >variable type should implement the Serializable interface if it isn't primitive type (or it will cause not serializable exception).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113237257
  
    --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java ---
    @@ -0,0 +1,52 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.starter;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.topology.ConfigurableTopology;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.tuple.Values;
    +
    +import java.util.UUID;
    +
    +public class LambdaTopology extends ConfigurableTopology {
    +    public static void main(String[] args) {
    +        ConfigurableTopology.start(new LambdaTopology(), args);
    +    }
    +
    +    @Override
    +    protected int run(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        // example. spout1: generate random strings
    +        // bolt1: get the first part of a string
    +        // bolt2: output the tuple
    +        builder.setSpout("spout1", () -> UUID.randomUUID().toString());
    --- End diff --
    
    I'm wondering if it makes sense to add a note here to help people avoid the case where they refer to a field on LambdaTopology from the lambda and hit a NotSerializableException?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113403709
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -353,6 +421,31 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel
         }
     
         /**
    +     * Define a new spout in this topology.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
    +     * @param supplier lambda expression that implements tuple generating for this spout
    +     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
    +     */
    +    public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier) throws IllegalArgumentException {
    --- End diff --
    
    Nice catch. Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113218838
  
    --- Diff: storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java ---
    @@ -0,0 +1,36 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.lambda;
    +
    +import org.apache.storm.topology.BasicOutputCollector;
    +import org.apache.storm.tuple.Tuple;
    +
    +public class LambdaBiConsumerBolt extends AbstractLambdaBolt {
    +
    +    private SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer;
    +
    +    public LambdaBiConsumerBolt(SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer) {
    --- End diff --
    
    Nitpick: It might be clearer to name this and the LambdaConsumerBolt something like Lambda(Non)TerminalBolt?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113403731
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -353,6 +421,31 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel
         }
     
         /**
    +     * Define a new spout in this topology.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
    +     * @param supplier lambda expression that implements tuple generating for this spout
    +     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
    +     */
    +    public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier) throws IllegalArgumentException {
    +        return setSpout(id, supplier, null);
    +    }
    +
    +    /**
    +     * Define a new spout in this topology with the specified parallelism. If the spout declares
    +     * itself as non-distributed, the parallelism_hint will be ignored and only one task
    +     * will be allocated to this component.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
    +     * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster.
    +     * @param supplier lambda expression that implements tuple generating for this spout
    +     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
    +     */
    +    public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier, Number parallelism_hint) throws IllegalArgumentException {
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2089: STORM-2490: Lambda support

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2089#discussion_r113364472
  
    --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java ---
    @@ -0,0 +1,52 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.starter;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.topology.ConfigurableTopology;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.tuple.Values;
    +
    +import java.util.UUID;
    +
    +public class LambdaTopology extends ConfigurableTopology {
    +    public static void main(String[] args) {
    +        ConfigurableTopology.start(new LambdaTopology(), args);
    +    }
    +
    +    @Override
    +    protected int run(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        // example. spout1: generate random strings
    +        // bolt1: get the first part of a string
    +        // bolt2: output the tuple
    +        builder.setSpout("spout1", () -> UUID.randomUUID().toString());
    --- End diff --
    
    Variable used in lambda expression should be final or effectively final (or it will cause compilation error), and variable type should implement the Serializable interface if it isn't primitive type (or it will cause not serializable exception).
    
    @srdo Thanks for your reminding. I will add some NOTE to remind users.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2089: STORM-2490: Lambda support

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/2089
  
    @vesense this looks good. +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---