You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by arunmahadevan <gi...@git.apache.org> on 2016/01/27 11:39:51 UTC

[GitHub] storm pull request: [STORM-1505] Add map and flatMap functions in ...

GitHub user arunmahadevan opened a pull request:

    https://github.com/apache/storm/pull/1050

    [STORM-1505] Add map and flatMap functions in trident stream

    `map` and `flatmap` are common stream operations. 
    
    Right now in trident this has to be implemented via each() which also sends the input field values
    in addition to the mapped field values, so the map and flatmap should make things slightly
    more efficient and easy.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/arunmahadevan/storm STORM-1505

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1050.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1050
    
----
commit 1832c369735576df61c6533ff772814b537e5d68
Author: Arun Mahadevan <ai...@hortonworks.com>
Date:   2016-01-27T07:02:33Z

    [STORM-1505] Add map and flatMap functions in trident stream
    
    map and flatmap are common stream operations. Right now in trident this has to be
    implemented via each() which also sends the input field values
    in addition to the mapped field values, so the map and flatmap should make things slightly
    more efficient and easy.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-177868343
  
    @ptgoetz updated `filter` api to use `each` as per your suggestion. Since I wanted to see the tuples flowing through in the example topology to make sure the apis are working as expected, I also added a `peek` api similar to the one available in java8 streams. 
    
    I created subtask for the 1.0 release jira for updating api docs. https://issues.apache.org/jira/browse/STORM-1513. Once the code changes are reviewed, I will update the docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-176607781
  
    Do we need MapProcessor/MapFunction as it looks very similar to each operation (EachProcessor)?
    
    ```java 
        public Stream each(Fields inputFields, Function function, Fields functionFields) 
        public Stream each(Function function, Fields functionFields) 
    ```
    Agree with @ptgoetz on filter.
    
    Good to have flatMap operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-176111096
  
    @ptgoetz addressed review comments and also added a `filter` api so that tuples can be filtered by passing only the predicate in the filter api than using each.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-178612406
  
    @ptgoetz added the overloaded filter api.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-178283234
  
    @arunmahadevan Can we move the peek implementation to a separate discussion so we can discuss it independently?
    
    And can you overload the filter() method with a fields selector parameter as I had suggested?
    
    Once those are done, I'm +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1050


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-176445555
  
    @arunmahadevan you read my mind with the addition of the `filter()` method, but I'm curious why you didn't use the existing `each()` method for it?
    
    For example:
    
    ```java
        public Stream filter(Fields inputFields, Filter filter) {
            return each(inputFields, new FilterExecutor(filter), new Fields());
        }
    
        public Stream filter(Filter filter) {
            return filter(getOutputFields(), filter);
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map and flatMap functions in ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1050#discussion_r51036546
  
    --- Diff: storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java ---
    @@ -0,0 +1,87 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.trident.planner.processor;
    +
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.trident.operation.FlatMapFunction;
    +import org.apache.storm.trident.operation.Function;
    +import org.apache.storm.trident.operation.MapFunction;
    +import org.apache.storm.trident.operation.TridentOperationContext;
    +import org.apache.storm.trident.planner.ProcessorContext;
    +import org.apache.storm.trident.planner.TridentProcessor;
    +import org.apache.storm.trident.tuple.TridentTuple;
    +import org.apache.storm.trident.tuple.TridentTupleView;
    +import org.apache.storm.tuple.Fields;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Processor for executing {@link org.apache.storm.trident.Stream#map(MapFunction)} and
    + * {@link org.apache.storm.trident.Stream#flatMap(FlatMapFunction)} functions.
    + */
    +public class MapProcessor implements TridentProcessor {
    +    Function _function;
    +    TridentContext _context;
    +    FreshCollector _collector;
    +    Fields _inputFields;
    +    TridentTupleView.ProjectionFactory _projection;
    +
    +    public MapProcessor(Fields inputFields, Function function) {
    +        _function = function;
    +        _inputFields = inputFields;
    +    }
    +
    +    @Override
    +    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
    +        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
    +        if(parents.size()!=1) {
    +            throw new RuntimeException("Each operation can only have one parent");
    --- End diff --
    
    Should this be "Map operation can have only one parent"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-178345578
  
    @ptgoetz Ok I will move peek implementation to a separate PR.
    
    Are you suggesting we overload filter api with fields parameter to keep it similar to each(inputFields, filter) ? Typically filter method accepts only the predicate so I did not add that api. E.g. [Java8 filter](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#filter-java.util.function.Predicate-). Let me know if we should add the overload.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map and flatMap functions in ...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1050#discussion_r51087917
  
    --- Diff: storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java ---
    @@ -0,0 +1,36 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.trident.operation;
    +
    +import org.apache.storm.tuple.Values;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +/**
    + * A one to many transformation function
    + */
    +public interface FlatMapFunction extends Serializable {
    +    /**
    +     * Invoked by the framework for each value in a stream.
    +     *
    +     * @param input the input value
    +     * @return an iterable over the resultant values
    +     */
    +    Iterable<Values> execute(Values input);
    --- End diff --
    
    Had thought about it but made it `Values` to keep the argument and return types same. On rethinking agree that for exposing the convenience methods, the argument type can be changed to `TridentTuple`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-178592968
  
    @arunmahadevan Yes, I'm talking about adding the following:
    
    ```java
    public Stream filter(Fields inputFields, Filter filter)
    ```
    
    I know it's minor, but I'm thinking of users having existing `Filter` implementations that may depend on specific fields being selected.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map and flatMap functions in ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1050#discussion_r51045778
  
    --- Diff: storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java ---
    @@ -0,0 +1,36 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.trident.operation;
    +
    +import org.apache.storm.tuple.Values;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +/**
    + * A one to many transformation function
    + */
    +public interface FlatMapFunction extends Serializable {
    +    /**
    +     * Invoked by the framework for each value in a stream.
    +     *
    +     * @param input the input value
    +     * @return an iterable over the resultant values
    +     */
    +    Iterable<Values> execute(Values input);
    --- End diff --
    
    Why use `Values` as the input parameter and not `ITuple`? With `Values` the user will have to know the index of the value they are after, whereas `Tuple` has convenience methods like `getStringByField()`.
    
    (Same question/comment applies to `MapFunction`.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-177022612
  
    @arunmahadevan lets go with the each() based approach for now since that's more familiar to existing users.
    
    Also can you file a follow up jira or update the docs, and add it as a su task for the 1.0 release jira?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-176632790
  
    @ptgoetz the current filter implementation on top of each appeared like a kludge. see https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FilterExecutor.java#L38. It does not appear to emit anything but apparently its taken care at some other level. Also `each` operation always appends the input fields to the output fields, so I directly implemented it via the MapProcessor. I can refactor it as you mentioned to re-use the existing code. 
    @satishd `each` emits input fields plus the new fileds, and the Function interface expects the implementor to emit within the function making it less of a pure function. map with the MapFunction interface is closer to the map operation in other streams.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-178707222
  
    +1 Thanks @arunmahadevan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1505] Add map, flatMap and filter funct...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/1050#issuecomment-178511982
  
    Raised a separate JIRA to track peek - https://issues.apache.org/jira/browse/STORM-1517


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---