You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by hmcl <gi...@git.apache.org> on 2016/12/07 03:15:54 UTC

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

GitHub user hmcl opened a pull request:

    https://github.com/apache/storm/pull/1816

    STORM-2223: PMMLBolt

      - PMML Bolt supporting pluggable runners
      - JPMML runner implementation
      - Test Topology

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hmcl/storm-apache Apache_master_STORM-2223_PMMLBolt

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1816.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1816
    
----
commit 6d6b712e4d21ccf6e78bdc10dbb271cc4b91859f
Author: Hugo Louro <hm...@gmail.com>
Date:   2016-12-03T00:11:33Z

    STORM-2223: PMMLBolt
      - PMML Bolt supporting pluggable runners
      - JPMML runner implementation
      - Test Topology

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736543
  
    --- Diff: examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/RawInputFromCSVSpout.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +public class RawInputFromCSVSpout extends BaseRichSpout {
    +    private static final Logger LOG = LoggerFactory.getLogger(RawInputFromCSVSpout.class);
    +
    +    private File csv;
    +    private List<String> outputFields;
    +    private BufferedReader br;
    +    private SpoutOutputCollector collector;
    +
    +    public RawInputFromCSVSpout(File rawInputCsv, List<String> outputFields) throws FileNotFoundException {
    +        Objects.requireNonNull(rawInputCsv);
    +        Objects.requireNonNull(outputFields);
    +
    +        this.csv = rawInputCsv;
    +        this.outputFields = outputFields;
    +    }
    +
    +    public static RawInputFromCSVSpout newInstance(File csv) throws IOException {
    +        List<String> outputFields;
    +        try (BufferedReader br = newReader(csv)) {
    +            String header = br.readLine();
    +            LOG.debug("Header: {}", header);
    +            header = header.replaceAll("\"", "");
    +            LOG.debug("Processed header: {}", header);
    +            final String[] inputNames = header.split(",");
    +            outputFields = Arrays.asList(inputNames);
    +        }
    +        return new RawInputFromCSVSpout(csv, outputFields);
    +    }
    +
    +    private static BufferedReader newReader(File csv) throws FileNotFoundException {
    +        return new BufferedReader(new InputStreamReader(new FileInputStream(csv)));
    +    }
    +
    +    @Override
    +    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +        this.collector = collector;
    +//        openReader();
    +    }
    +
    +    @Override
    +    public void nextTuple() {
    +        try {
    +            String line = null;
    +            while ((line = br.readLine()) != null) {
    +                collector.emit(Arrays.asList(line.split(",")));
    +            }
    +        } catch (IOException e) {
    +            closeReader();
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        declarer.declare(new Fields(outputFields));
    +    }
    +
    +    @Override
    +    public void activate() {
    +        openReader();
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92331912
  
    --- Diff: examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import com.google.common.collect.Lists;
    +
    +import org.apache.commons.compress.utils.IOUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.StormSubmitter;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.model.jpmml.JpmmlModelOutputFields;
    +import org.apache.storm.pmml.runner.jpmml.JpmmlFactory;
    +import org.apache.storm.topology.BasicOutputCollector;
    +import org.apache.storm.topology.IRichBolt;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseBasicBolt;
    +import org.apache.storm.tuple.Tuple;
    +
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +public class JpmmlRunnerTestTopology {
    --- End diff --
    
    Maybe some short description of what this topology does/predicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736561
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    +    private ModelRunnerFactory runnerFactory;
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92329373
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    +    private ModelRunnerFactory runnerFactory;
    +    private ModelRunner runner;
    +    private OutputCollector collector;
    +
    +    /*
    +     * Passing a factory rather than the actual object to avoid enforcing the strong
    +     * requirement of having to have ModelRunner to be Serializable
    +     */
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor has no declared output fields
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory) {
    +        this(modelRunnerFactory, null);
    +    }
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor declares the output fields as specified
    +     * by the {@link ModelOutputFields} parameter
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory, ModelOutputFields modelOutputFields) {
    +        this.outFields = modelOutputFields;
    +        this.runnerFactory = modelRunnerFactory;
    +        LOG.info("Instantiated {}", this);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.runner = runnerFactory.newModelRunner();
    +        this.collector = collector;
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        try {
    +            final Map<Stream, List<Object>> scoresPerStream = runner.scoredTuplePerStream(input);
    +            LOG.debug("Input tuple [{}] generated predicted scores [{}]", input, scoresPerStream);
    +            if (scoresPerStream != null) {
    +                for (Stream stream : scoresPerStream.keySet()) {
    +                    collector.emit(stream.getId(), input, scoresPerStream.get(stream));
    +                }
    +            } else {
    +                LOG.debug("Input tuple [{}] generated NULL scores", input);
    +            }
    +        } catch(Exception e) {
    +            collector.reportError(e);
    +            collector.fail(input);
    +        }
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        if (outFields != null) {
    --- End diff --
    
    In the `execute` it seems the results are always emitted, wont it fail when `outFields==null` ? 
    
    Also how are you ensuring that the streams declared in `declareStream` here are the ones returned by `runner.scoredTuplePerStream` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92246988
  
    --- Diff: external/storm-pmml/README.md ---
    @@ -0,0 +1,104 @@
    +#Storm PMML Bolt
    + Storm integration to load PMML models and compute predictive scores for running tuples. The PMML model represents
    + the machine learning (predictive) model used to do prediction on raw input data. The model is typically loaded into a 
    + runtime environment, which will score the raw data that comes in the tuples. 
    +
    +#Create Instance of PMML Bolt
    + To create an instance of the `PMMLPredictorBolt` you must provide a `ModelRunner` using a `ModelRunnerFactory`,
    + and optionally an instance of `ModelOutputFields`. The `ModelOutputFields` is only required if you wish to emit
    + tuples with predicted scores to one or multiple streams. Otherwise, the `PMMLPredictorBolt` will declare no
    + output fields.
    + 
    + The `ModelRunner` represents the runtime environment to execute the predictive scoring. It has only one method: 
    + 
    + ```java
    +    Map<Stream, List<Object>> scoredTuplePerStream(Tuple input); 
    + ```
    + 
    + This method contains the logic to compute the scored tuples from the raw inputs tuple.  It's up to the discretion of the 
    + implementation to define which scored values are to be assigned to each `Stream`. A `Stream` is a representation of a Storm stream.
    +   
    + The `PmmlModelRunner` is an extension of `ModelRunner` that represents the typical steps involved 
    + in predictive scoring. Hence, it allows for the **extraction** of raw inputs from the tuple, **pre process** the 
    + raw inputs, and **predict** the scores from the preprocessed data.
    + 
    + The `JPmmlModelRunner` is an implementation of `PmmlModelRunner` that uses [JPMML](https://github.com/jpmml/jpmml) as
    + runtime environment. This implementation extracts the raw inputs from the tuple for all `active fields`, 
    + and builds a tuple with the predicted scores for the `predicted fields` and `output fields`. 
    + In this implementation all the declared streams will have the same scored tuple.
    + 
    + The `predicted`, `active`, and `output` fields are extracted from the PMML model.
    +
    +#Run Bundled Examples
    +
    +To run the examples you must copy the `storm-pmml` uber jar to `STORM-HOME/extlib` and then run the command:
    --- End diff --
    
    @ptgoetz do you mean that you would like to have an uber jar like `storm-pmml-examples-2.0.0-SNAPSHOT.jar under external/storm-pmml such that the users could just do `storm jar storm-pmml-examples-2.0.0-SNAPSHOT.jar org.apache.storm.pmm.TopologyMain` and run it ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92247952
  
    --- Diff: examples/storm-pmml-examples/src/main/resources/KNIME_PMML_4.1_Examples_single_audit_logreg.xml ---
    @@ -0,0 +1,259 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    --- End diff --
    
    @ptgoetz I found [PMML	Notice and License](http://dmg.org/documents/dmg-pmml-license-2016.pdf).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92244506
  
    --- Diff: examples/storm-pmml-examples/src/main/resources/Audit.50.csv ---
    @@ -0,0 +1,51 @@
    +ID,Age,Employment,Education,Marital,Occupation,Income,Gender,Deductions,Hours,IGNORE_Accounts,RISK_Adjustment,TARGET_Adjusted
    --- End diff --
    
    Also interested in what the license of the data set is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @hmcl  Also can you open a PR against 1.x-branch as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92257368
  
    --- Diff: examples/storm-pmml-examples/src/main/resources/KNIME_PMML_4.1_Examples_single_audit_logreg.xml ---
    @@ -0,0 +1,259 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    --- End diff --
    
    Okay, it's BSD 3-clause. So there needs to be an entry in both source and binary distribution LICENSE files per this guide: http://www.apache.org/dev/licensing-howto.html#permissive-deps
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92330433
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    +    private ModelRunnerFactory runnerFactory;
    +    private ModelRunner runner;
    +    private OutputCollector collector;
    +
    +    /*
    +     * Passing a factory rather than the actual object to avoid enforcing the strong
    +     * requirement of having to have ModelRunner to be Serializable
    +     */
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor has no declared output fields
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory) {
    +        this(modelRunnerFactory, null);
    +    }
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor declares the output fields as specified
    +     * by the {@link ModelOutputFields} parameter
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory, ModelOutputFields modelOutputFields) {
    +        this.outFields = modelOutputFields;
    +        this.runnerFactory = modelRunnerFactory;
    +        LOG.info("Instantiated {}", this);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.runner = runnerFactory.newModelRunner();
    +        this.collector = collector;
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        try {
    +            final Map<Stream, List<Object>> scoresPerStream = runner.scoredTuplePerStream(input);
    +            LOG.debug("Input tuple [{}] generated predicted scores [{}]", input, scoresPerStream);
    +            if (scoresPerStream != null) {
    +                for (Stream stream : scoresPerStream.keySet()) {
    +                    collector.emit(stream.getId(), input, scoresPerStream.get(stream));
    +                }
    +            } else {
    +                LOG.debug("Input tuple [{}] generated NULL scores", input);
    +            }
    +        } catch(Exception e) {
    +            collector.reportError(e);
    +            collector.fail(input);
    +        }
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        if (outFields != null) {
    +            LOG.info("Declaring output fields [{}]", outFields);
    +            for (Map.Entry<? extends Stream, ? extends Fields> of : outFields.toMap().entrySet()) {
    +                final Stream stream = of.getKey();
    +                final Fields fields = of.getValue();
    +                declarer.declareStream(stream.getId(), stream.isDirect(), fields);
    --- End diff --
    
    doesnt make sense to declare stream as direct unless you intend to emit directly to specific tasks via emitDirect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92327978
  
    --- Diff: examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/RawInputFromCSVSpout.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +public class RawInputFromCSVSpout extends BaseRichSpout {
    +    private static final Logger LOG = LoggerFactory.getLogger(RawInputFromCSVSpout.class);
    +
    +    private File csv;
    +    private List<String> outputFields;
    +    private BufferedReader br;
    +    private SpoutOutputCollector collector;
    +
    +    public RawInputFromCSVSpout(File rawInputCsv, List<String> outputFields) throws FileNotFoundException {
    +        Objects.requireNonNull(rawInputCsv);
    +        Objects.requireNonNull(outputFields);
    +
    +        this.csv = rawInputCsv;
    +        this.outputFields = outputFields;
    +    }
    +
    +    public static RawInputFromCSVSpout newInstance(File csv) throws IOException {
    +        List<String> outputFields;
    +        try (BufferedReader br = newReader(csv)) {
    +            String header = br.readLine();
    +            LOG.debug("Header: {}", header);
    +            header = header.replaceAll("\"", "");
    +            LOG.debug("Processed header: {}", header);
    +            final String[] inputNames = header.split(",");
    +            outputFields = Arrays.asList(inputNames);
    +        }
    +        return new RawInputFromCSVSpout(csv, outputFields);
    +    }
    +
    +    private static BufferedReader newReader(File csv) throws FileNotFoundException {
    +        return new BufferedReader(new InputStreamReader(new FileInputStream(csv)));
    +    }
    +
    +    @Override
    +    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +        this.collector = collector;
    +//        openReader();
    +    }
    +
    +    @Override
    +    public void nextTuple() {
    +        try {
    +            String line = null;
    +            while ((line = br.readLine()) != null) {
    +                collector.emit(Arrays.asList(line.split(",")));
    +            }
    +        } catch (IOException e) {
    +            closeReader();
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        declarer.declare(new Fields(outputFields));
    +    }
    +
    +    @Override
    +    public void activate() {
    +        openReader();
    --- End diff --
    
    seems every time the spout is deactivated/activated it starts reading the file from the beginning which may not be desirable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92328927
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    +    private ModelRunnerFactory runnerFactory;
    +    private ModelRunner runner;
    +    private OutputCollector collector;
    +
    +    /*
    +     * Passing a factory rather than the actual object to avoid enforcing the strong
    +     * requirement of having to have ModelRunner to be Serializable
    +     */
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor has no declared output fields
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory) {
    +        this(modelRunnerFactory, null);
    +    }
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor declares the output fields as specified
    +     * by the {@link ModelOutputFields} parameter
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory, ModelOutputFields modelOutputFields) {
    +        this.outFields = modelOutputFields;
    +        this.runnerFactory = modelRunnerFactory;
    +        LOG.info("Instantiated {}", this);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.runner = runnerFactory.newModelRunner();
    +        this.collector = collector;
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        try {
    +            final Map<Stream, List<Object>> scoresPerStream = runner.scoredTuplePerStream(input);
    +            LOG.debug("Input tuple [{}] generated predicted scores [{}]", input, scoresPerStream);
    +            if (scoresPerStream != null) {
    +                for (Stream stream : scoresPerStream.keySet()) {
    +                    collector.emit(stream.getId(), input, scoresPerStream.get(stream));
    --- End diff --
    
    where is the input acked?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92328525
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    --- End diff --
    
    can be final


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92331432
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JPmmlModelRunner.java ---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml.runner.jpmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.PmmlModelRunner;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.dmg.pmml.FieldName;
    +import org.jpmml.evaluator.Evaluator;
    +import org.jpmml.evaluator.EvaluatorUtil;
    +import org.jpmml.evaluator.FieldValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * JPMML implementation of {@link PmmlModelRunner}. It extracts the raw inputs from the tuple for all
    + * 'active fields', and builds a tuple with the predicted scores for the 'predicted fields' and 'output fields'.
    + * In this implementation all the declared streams will have the same scored tuple.
    + *
    + * The 'predicted', 'active', and 'output' fields are extracted from the PMML model.
    + */
    +public class JPmmlModelRunner implements PmmlModelRunner<Map<FieldName, Object>,
    +        Map<FieldName, FieldValue>,
    +        Map<FieldName, ?>> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(JPmmlModelRunner.class);
    +
    +    private final Evaluator eval;                       // Jpmml evaluator
    +    private final List<FieldName> activeFields;
    +    private final List<FieldName> predictedFields;
    +    private final List<FieldName> outputFields;
    +    private ModelOutputFields modelOutputFields;
    +
    +    public JPmmlModelRunner(Evaluator evaluator, ModelOutputFields modelOutputFields) {
    +        this.eval = evaluator;
    +        activeFields = evaluator.getActiveFields();
    +        this.modelOutputFields = modelOutputFields;
    +        predictedFields = eval.getPredictedFields();
    +        outputFields = eval.getOutputFields();
    +    }
    +
    +    /**
    +     * @return The raw inputs extracted from the tuple for all 'active fields'
    +     */
    +    @Override
    +    public Map<FieldName, Object> extractRawInputs(Tuple tuple) {
    +        LOG.debug("Extracting raw inputs from tuple: = [{}]", tuple);
    +        final Map<FieldName, Object> rawInputs = new LinkedHashMap<>();
    +        for (FieldName activeField : activeFields) {
    +            rawInputs.put(activeField, tuple.getValueByField(activeField.getValue()));
    +        }
    +        LOG.debug("Raw inputs = [{}]", rawInputs);
    +        return rawInputs;
    +    }
    +
    +    @Override
    +    public Map<FieldName, FieldValue> preProcessInputs(Map<FieldName, Object> rawInputs) {
    +        LOG.debug("Pre processing raw inputs: = [{}]", rawInputs);
    +        final Map<FieldName, FieldValue> preProcInputs = new LinkedHashMap<>();
    +        for (Map.Entry<FieldName, Object> rawEntry : rawInputs.entrySet()) {
    +            preProcInputs.putIfAbsent(rawEntry.getKey(), EvaluatorUtil.prepare(eval, rawEntry.getKey(), rawEntry.getValue()));
    +        }
    +        LOG.debug("Pre processed inputs = [{}]", preProcInputs);
    +        return preProcInputs;
    +    }
    +
    +    @Override
    +    public Map<FieldName, ?> predictScores(Map<FieldName, FieldValue> preProcInputs) {
    +        LOG.debug("Predicting scores for pre processed inputs: = [{}]", preProcInputs);
    +        Map<FieldName, ?> predictedScores = eval.evaluate(preProcInputs);
    +        LOG.debug("Predicted scores = [{}]", predictedScores);
    +        return predictedScores;
    +    }
    +
    +    /**
    +     * @return the predicted scores for the 'predicted fields' and 'output fields'.
    +     * All the declared streams will have the same scored tuple.
    +     */
    +    @Override
    +    public Map<Stream, List<Object>> scoredTuplePerStream(Tuple input) {
    +        final Map<FieldName, Object> rawInputs = extractRawInputs(input);
    +        final Map<FieldName, FieldValue> preProcInputs = preProcessInputs(rawInputs);
    +        final Map<FieldName, ?> predScores = predictScores(preProcInputs);
    +
    +        return toValuesMap(predScores);
    +    }
    +
    +    // Sends the same tuple (list of scored/predicted values) to all the declared streams
    +    private Map<Stream, List<Object>> toValuesMap(Map<FieldName, ?> predScores) {
    +        final List<Object> scoredVals = new ArrayList<>();
    +
    +        for (FieldName predictedField : predictedFields) {
    +            Object targetValue = predScores.get(predictedField);
    +            scoredVals.add(EvaluatorUtil.decode(targetValue));
    +        }
    +
    +        for (FieldName outputField : outputFields) {
    +            Object targetValue = predScores.get(outputField);
    +            scoredVals.add(EvaluatorUtil.decode(targetValue));
    +        }
    +
    +        final Map<Stream, List<Object>> valuesMap = new HashMap<>();
    +
    +        for (Map.Entry<? extends Stream, ? extends Fields> entry : modelOutputFields.toMap().entrySet()) {
    --- End diff --
    
    use keySet. It may be better if ModelOuputFields itself has a method to returns the keys, right now its more of map wrapper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r91586225
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JpmmlFactory.java ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml.runner.jpmml;
    +
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.dmg.pmml.IOUtil;
    +import org.dmg.pmml.PMML;
    +import org.jpmml.evaluator.Evaluator;
    +import org.jpmml.evaluator.ModelEvaluator;
    +import org.jpmml.evaluator.ModelEvaluatorFactory;
    +import org.jpmml.manager.PMMLManager;
    +import org.xml.sax.SAXException;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Objects;
    +
    +import javax.xml.bind.JAXBException;
    +
    +/*
    + * This class consists exclusively of static factory methods that create
    + * object instances that are essential to work with the Jpmml library
    + */
    +public class JpmmlFactory {
    +    /**
    +     * Creates a new {@link PMML} object representing the PMML model defined in the XML {@link File} specified as argument
    +     */
    +    public static PMML newPmml(File file) throws JAXBException, SAXException, IOException {
    --- End diff --
    
    It would also be very useful to be able to load/update models from the blob store so users don't have to redeploy a topology to update their model.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @arunmahadevan done! 
    @harshach @ptgoetz @csivaguru can you please take one final look. 
    If everything is OK, I will go ahead and squash the commits. 
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @hmcl @HeartSaVioR Got it. Thanks.
    But I wonder the legacy code(https://github.com/jpmml/jpmml) was updated three years ago and it seems would never been updated. This can be a problem.
    Also, there has been a project(https://github.com/jpmml/jpmml-storm) for integrating with Storm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @hmcl Do you plan to add unit tests for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92249343
  
    --- Diff: external/storm-pmml/README.md ---
    @@ -0,0 +1,104 @@
    +#Storm PMML Bolt
    + Storm integration to load PMML models and compute predictive scores for running tuples. The PMML model represents
    + the machine learning (predictive) model used to do prediction on raw input data. The model is typically loaded into a 
    + runtime environment, which will score the raw data that comes in the tuples. 
    +
    +#Create Instance of PMML Bolt
    + To create an instance of the `PMMLPredictorBolt` you must provide a `ModelRunner` using a `ModelRunnerFactory`,
    + and optionally an instance of `ModelOutputFields`. The `ModelOutputFields` is only required if you wish to emit
    + tuples with predicted scores to one or multiple streams. Otherwise, the `PMMLPredictorBolt` will declare no
    + output fields.
    + 
    + The `ModelRunner` represents the runtime environment to execute the predictive scoring. It has only one method: 
    + 
    + ```java
    +    Map<Stream, List<Object>> scoredTuplePerStream(Tuple input); 
    + ```
    + 
    + This method contains the logic to compute the scored tuples from the raw inputs tuple.  It's up to the discretion of the 
    + implementation to define which scored values are to be assigned to each `Stream`. A `Stream` is a representation of a Storm stream.
    +   
    + The `PmmlModelRunner` is an extension of `ModelRunner` that represents the typical steps involved 
    + in predictive scoring. Hence, it allows for the **extraction** of raw inputs from the tuple, **pre process** the 
    + raw inputs, and **predict** the scores from the preprocessed data.
    + 
    + The `JPmmlModelRunner` is an implementation of `PmmlModelRunner` that uses [JPMML](https://github.com/jpmml/jpmml) as
    + runtime environment. This implementation extracts the raw inputs from the tuple for all `active fields`, 
    + and builds a tuple with the predicted scores for the `predicted fields` and `output fields`. 
    + In this implementation all the declared streams will have the same scored tuple.
    + 
    + The `predicted`, `active`, and `output` fields are extracted from the PMML model.
    +
    +#Run Bundled Examples
    +
    +To run the examples you must copy the `storm-pmml` uber jar to `STORM-HOME/extlib` and then run the command:
    + 
    + ```java
    + STORM-HOME/bin/storm jar STORM-HOME/external/storm-pmml/storm-pmml-examples-2.0.0-SNAPSHOT.jar 
    + org.apache.storm.pmml.JpmmlRunnerTestTopology jpmmlTopology PMMLModel.xml RawInputData.csv
    + ```
    +#Build Uber JAR 
    +
    +To build the uber jar with all the dependencies for the module `storm-pmml` you must run the command
    +
    +```
    +mvn package -f REPO_HOME/external/storm-pmml/pom.xml
    +```
    +
    +after adding the following declaration to `REPO_HOME/storm/external/storm-pmml/pom.xml`
    +
    +```
    +<build>
    +        <plugins>
    +            <plugin>
    +                <groupId>org.apache.maven.plugins</groupId>
    +                <artifactId>maven-shade-plugin</artifactId>
    +                <configuration>
    +                    <createDependencyReducedPom>true</createDependencyReducedPom>
    +                </configuration>
    +                <executions>
    +                    <execution>
    +                        <phase>package</phase>
    +                        <goals>
    +                            <goal>shade</goal>
    +                        </goals>
    +                        <configuration>
    +                            <transformers>
    +                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
    +                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    +                                    <mainClass>org.apache.storm.pmml.JpmmlRunnerTestTopology</mainClass>
    +                                </transformer>
    +                            </transformers>
    +                        </configuration>
    +                    </execution>
    +                </executions>
    +            </plugin>
    +        </plugins>
    +    </build>
    +```
    +
    +
    +## License
    +
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +
    +
    +## Committer Sponsors
    + * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
    +
    +This general abstraction has the purpose of supporting arbitrary implementations that compute predicted scores from raw inputs
    --- End diff --
    
    Good catch. I forgot to clean this up. This info is already mentioned in the introduction section in slight different way, but with the same exact meaning. Will delete it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    pmml-evaluator is AGPLv3.
    
    http://search.maven.org/#artifactdetails%7Corg.jpmml%7Cpmml-evaluator%7C1.3.3%7Cjar
    
    pmml-model and pmml-schema are, at least pom file license description, BSD.
    
    But what's the difference between jpmml and pmml, and why they share the group name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    Looks like a good start, but it really needs some documentation. It would also be helpful to include a sample model + CSV data, without that it's not very clear how to run the example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @ptgoetz @arunmahadevan @harshach Thanks for your reviews. I believe I addressed all the comments. There is a separate commit with the comments changes. When we all agree that the patch is ready to merge, I will squash the commits to keep the log clean. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by csivaguru <gi...@git.apache.org>.
Github user csivaguru commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    +1 the changes looks good. After unzipping the distribution, I am not able to find the examples jar file under examples/storm-pmml-examples. Its under external/storm-pmml.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92328530
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    +    private ModelRunnerFactory runnerFactory;
    --- End diff --
    
    can be final


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r93312373
  
    --- Diff: storm-dist/binary/final-package/src/main/assembly/binary.xml ---
    @@ -446,6 +446,29 @@
                     <include>storm*jar</include>
                 </includes>
             </fileSet>
    +
    +        <fileSet>
    +            <directory>${project.basedir}/../../external/storm-pmml/target</directory>
    +            <outputDirectory>external/storm-pmml</outputDirectory>
    +            <includes>
    +                <include>storm*jar</include>
    +            </includes>
    +        </fileSet>
    +        <fileSet>
    +            <directory>${project.basedir}/../../external/storm-pmml</directory>
    +            <outputDirectory>external/storm-pmml</outputDirectory>
    +            <includes>
    +                <include>README.*</include>
    +            </includes>
    +        </fileSet>
    +        <fileSet>
    +            <directory>${project.basedir}/../../examples/storm-pmml-examples/target</directory>
    +            <outputDirectory>examples/storm-pmml-examples</outputDirectory>
    +            <includes>
    +                <include>storm*jar</include>
    +            </includes>
    +        </fileSet>
    +
    --- End diff --
    
    Do we want the sample model and data to be included in the distribution?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @vesense there is no alternate option for PMML in java. This is atleast give us a good start . 
    If we see any adoption we are planning on adding further development to have our own java PMML library. Also the same version is used several other projects. Example  cascading here https://github.com/Cascading/pattern/blob/wip-1.0/pattern-pmml/build.gradle#L26
    
    regarding the jpmml-storm, I reached out the developer in his words he said that just an experiment code and not really prod quality and they have no interest in contributing to the storm.
    Having several connectors and interesting NLP, ML bolts in storm itself will improve adoption of those connectors and will give users an incentive to use storm.
    I don't see any issue having this in storm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736547
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java ---
    @@ -39,7 +39,6 @@
     import java.io.InputStreamReader;
     import java.util.HashMap;
     import java.util.Map;
    -import java.util.concurrent.TimeUnit;
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @hmcl you can add me as sponsor to the module.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    hi @hmcl 
    
    Maybe the licenses between Apache Storm and JPMML(licensed under AGPLv3) are incompatible.
    Accoding to https://www.apache.org/licenses/GPL-compatibility.html
    >GPLv3 software cannot be included in Apache projects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736718
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    +    private ModelRunnerFactory runnerFactory;
    +    private ModelRunner runner;
    +    private OutputCollector collector;
    +
    +    /*
    +     * Passing a factory rather than the actual object to avoid enforcing the strong
    +     * requirement of having to have ModelRunner to be Serializable
    +     */
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor has no declared output fields
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory) {
    +        this(modelRunnerFactory, null);
    +    }
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor declares the output fields as specified
    +     * by the {@link ModelOutputFields} parameter
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory, ModelOutputFields modelOutputFields) {
    +        this.outFields = modelOutputFields;
    +        this.runnerFactory = modelRunnerFactory;
    +        LOG.info("Instantiated {}", this);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.runner = runnerFactory.newModelRunner();
    +        this.collector = collector;
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        try {
    +            final Map<Stream, List<Object>> scoresPerStream = runner.scoredTuplePerStream(input);
    +            LOG.debug("Input tuple [{}] generated predicted scores [{}]", input, scoresPerStream);
    +            if (scoresPerStream != null) {
    +                for (Stream stream : scoresPerStream.keySet()) {
    +                    collector.emit(stream.getId(), input, scoresPerStream.get(stream));
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736485
  
    --- Diff: examples/storm-pmml-examples/src/main/resources/KNIME_PMML_4.1_Examples_single_audit_logreg.xml ---
    @@ -0,0 +1,259 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    --- End diff --
    
    DONE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r91879180
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JPmmlModelRunner.java ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml.runner.jpmml;
    +
    +import com.google.common.collect.ImmutableList;
    +
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.PmmlModelRunner;
    +import org.apache.storm.tuple.Tuple;
    +import org.dmg.pmml.FieldName;
    +import org.jpmml.evaluator.Evaluator;
    +import org.jpmml.evaluator.EvaluatorUtil;
    +import org.jpmml.evaluator.FieldValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class JPmmlModelRunner implements PmmlModelRunner<Map<FieldName, Object>,
    +        Map<FieldName, FieldValue>,
    +        Map<FieldName, ?>> {
    +
    +    public static final List<Stream> DEFAULT_STREAM = ImmutableList.of(Stream.defaultStream());
    +    private static final Logger LOG = LoggerFactory.getLogger(JPmmlModelRunner.class);
    +
    +    private final Evaluator eval;                       // Jpmml evaluator
    +    private final List<FieldName> activeFields;
    +    private final List<FieldName> predictedFields;
    +    private final List<FieldName> outputFields;
    +    private final List<Stream> streams;
    +
    +    public JPmmlModelRunner(Evaluator evaluator) {
    +        this(evaluator, DEFAULT_STREAM);
    +    }
    +
    +    public JPmmlModelRunner(Evaluator evaluator, List<Stream> streams) {
    +        this.eval = evaluator;
    +        activeFields = evaluator.getActiveFields();
    +        predictedFields = eval.getPredictedFields();
    +        outputFields = eval.getOutputFields();
    +        this.streams = streams;
    +    }
    +
    +    @Override
    +    public Map<FieldName, Object> extractRawInputs(Tuple tuple) {
    +        LOG.debug("Extracting raw inputs from tuple: = [{}]", tuple);
    +        final Map<FieldName, Object> rawInputs = new LinkedHashMap<>();
    +        for (FieldName activeField : activeFields) {
    +            rawInputs.put(activeField, tuple.getValueByField(activeField.getValue()));
    +        }
    +        LOG.debug("Raw inputs = [{}]", rawInputs);
    +        return rawInputs;
    --- End diff --
    
    good catch. removed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @ptgoetz 
    The main focus of this initial patch is to design the classes in such a way that they can accommodate arbitrary runtime environments. The default implementation provided uses one such runtime execution library, which is more suited to be tested using integration tests. The integration tests are already provided through the example topology.
    
    As for unit tests, I have filed this [JIRA](https://issues.apache.org/jira/browse/STORM-2253) to assert for edge cases and some common cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @vesense I addressed your serialization comment. I had to do some refactoring of the code because it was a strong requirement to enforce `ModelRunner` to be serializable. For instance, the JPMML implementation has several non serializable objects that really made this approach impossible. Rather I extract the output fields from the PMML model, and pass them to the bolt directly. The runner is then created in the prepare method. I will upload the README in bit.
    
    @harshach @HeartSaVioR can you please take a look as well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @hmcl Code looks good. Left one comment. I think it is also necessary to add a README file to let people know how to use it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736455
  
    --- Diff: external/storm-pmml/README.md ---
    @@ -0,0 +1,104 @@
    +#Storm PMML Bolt
    + Storm integration to load PMML models and compute predictive scores for running tuples. The PMML model represents
    + the machine learning (predictive) model used to do prediction on raw input data. The model is typically loaded into a 
    + runtime environment, which will score the raw data that comes in the tuples. 
    +
    +#Create Instance of PMML Bolt
    + To create an instance of the `PMMLPredictorBolt` you must provide a `ModelRunner` using a `ModelRunnerFactory`,
    + and optionally an instance of `ModelOutputFields`. The `ModelOutputFields` is only required if you wish to emit
    + tuples with predicted scores to one or multiple streams. Otherwise, the `PMMLPredictorBolt` will declare no
    + output fields.
    + 
    + The `ModelRunner` represents the runtime environment to execute the predictive scoring. It has only one method: 
    + 
    + ```java
    +    Map<Stream, List<Object>> scoredTuplePerStream(Tuple input); 
    + ```
    + 
    + This method contains the logic to compute the scored tuples from the raw inputs tuple.  It's up to the discretion of the 
    + implementation to define which scored values are to be assigned to each `Stream`. A `Stream` is a representation of a Storm stream.
    +   
    + The `PmmlModelRunner` is an extension of `ModelRunner` that represents the typical steps involved 
    + in predictive scoring. Hence, it allows for the **extraction** of raw inputs from the tuple, **pre process** the 
    + raw inputs, and **predict** the scores from the preprocessed data.
    + 
    + The `JPmmlModelRunner` is an implementation of `PmmlModelRunner` that uses [JPMML](https://github.com/jpmml/jpmml) as
    + runtime environment. This implementation extracts the raw inputs from the tuple for all `active fields`, 
    + and builds a tuple with the predicted scores for the `predicted fields` and `output fields`. 
    + In this implementation all the declared streams will have the same scored tuple.
    + 
    + The `predicted`, `active`, and `output` fields are extracted from the PMML model.
    +
    +#Run Bundled Examples
    +
    +To run the examples you must copy the `storm-pmml` uber jar to `STORM-HOME/extlib` and then run the command:
    --- End diff --
    
    @ptgoetz users can now just run the tests Jar


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736541
  
    --- Diff: examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import com.google.common.collect.Lists;
    +
    +import org.apache.commons.compress.utils.IOUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.StormSubmitter;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.model.jpmml.JpmmlModelOutputFields;
    +import org.apache.storm.pmml.runner.jpmml.JpmmlFactory;
    +import org.apache.storm.topology.BasicOutputCollector;
    +import org.apache.storm.topology.IRichBolt;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseBasicBolt;
    +import org.apache.storm.tuple.Tuple;
    +
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +public class JpmmlRunnerTestTopology {
    +    private static final String RAW_INPUT_FROM_CSV_SPOUT = "rawInputFromCsvSpout";
    +    private static final String PMML_PREDICTOR_BOLT = "pmmLPredictorBolt";
    +    private static final String PRINT_BOLT = "printBolt";
    +
    +    private File rawInputs;           // Raw input data to be scored (predicted)
    +    private File pmml;                // PMML Model
    +    private boolean isLocal;
    +    private String tplgyName;
    +
    +    public static void main(String[] args) throws Exception {
    +        try {
    +            JpmmlRunnerTestTopology testTopology = new JpmmlRunnerTestTopology();
    +            testTopology.parseArgs(args);
    +            System.out.println(String.format("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]",
    +                    testTopology.pmml.getAbsolutePath(), testTopology.rawInputs.getAbsolutePath()));
    +            testTopology.run();
    +        } catch (Exception e) {
    +            printUsage();
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private void parseArgs(String[] args) {
    +        if (Arrays.stream(args).anyMatch(option -> option.equals("-h"))) {
    +            printUsage();
    +        } else {
    +            if (args.length < 3) {
    +                tplgyName = "pmmlPredictorLocal";
    +                isLocal = true;
    +                if (args.length == 0) {     // run local examples
    +                    pmml = loadExample(pmml, "KNIME_PMML_4.1_Examples_single_audit_logreg.xml");
    +                    rawInputs = loadExample(rawInputs, "Audit.50.csv");
    +                } else {
    +                    pmml = new File(args[0]);
    +                    rawInputs = new File(args[1]);
    +                }
    +            } else {
    +                tplgyName = args[0];
    +                pmml = new File(args[1]);
    +                rawInputs = new File(args[2]);
    +                isLocal = false;
    +            }
    +        }
    +    }
    +
    +    private File loadExample(File file, String example) {
    +        try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(example)) {
    +            file = File.createTempFile("/tmp/" + example, ".tmp");
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by csivaguru <gi...@git.apache.org>.
Github user csivaguru commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r91385214
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JpmmlFactory.java ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml.runner.jpmml;
    +
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.dmg.pmml.IOUtil;
    +import org.dmg.pmml.PMML;
    +import org.jpmml.evaluator.Evaluator;
    +import org.jpmml.evaluator.ModelEvaluator;
    +import org.jpmml.evaluator.ModelEvaluatorFactory;
    +import org.jpmml.manager.PMMLManager;
    +import org.xml.sax.SAXException;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Objects;
    +
    +import javax.xml.bind.JAXBException;
    +
    +/*
    + * This class consists exclusively of static factory methods that create
    + * object instances that are essential to work with the Jpmml library
    + */
    +public class JpmmlFactory {
    +    /**
    +     * Creates a new {@link PMML} object representing the PMML model defined in the XML {@link File} specified as argument
    +     */
    +    public static PMML newPmml(File file) throws JAXBException, SAXException, IOException {
    --- End diff --
    
    Nit: Consider adding support for getting newPmml from an input stream as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @csivaguru @ptgoetz @harshach I uploaded a reviewed version addressing your comments. Can you please take a look. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @hmcl 
    Sorry I confused the version. Got it.
    
    @vesense 
    Before 1.1 they're using Apache license.
    https://github.com/jpmml/jpmml


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    Also, what about unit tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736555
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736752
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    +    private ModelRunnerFactory runnerFactory;
    +    private ModelRunner runner;
    +    private OutputCollector collector;
    +
    +    /*
    +     * Passing a factory rather than the actual object to avoid enforcing the strong
    +     * requirement of having to have ModelRunner to be Serializable
    +     */
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor has no declared output fields
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory) {
    +        this(modelRunnerFactory, null);
    +    }
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor declares the output fields as specified
    +     * by the {@link ModelOutputFields} parameter
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory, ModelOutputFields modelOutputFields) {
    +        this.outFields = modelOutputFields;
    +        this.runnerFactory = modelRunnerFactory;
    +        LOG.info("Instantiated {}", this);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.runner = runnerFactory.newModelRunner();
    +        this.collector = collector;
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        try {
    +            final Map<Stream, List<Object>> scoresPerStream = runner.scoredTuplePerStream(input);
    +            LOG.debug("Input tuple [{}] generated predicted scores [{}]", input, scoresPerStream);
    +            if (scoresPerStream != null) {
    +                for (Stream stream : scoresPerStream.keySet()) {
    +                    collector.emit(stream.getId(), input, scoresPerStream.get(stream));
    +                }
    +            } else {
    +                LOG.debug("Input tuple [{}] generated NULL scores", input);
    +            }
    +        } catch(Exception e) {
    +            collector.reportError(e);
    +            collector.fail(input);
    +        }
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        if (outFields != null) {
    --- End diff --
    
    I have documented this. The bottom line is that only the user knows which scores he wants to emit on each stream. If she/he chooses to emit on a stream that does not exist, that's a user error. 
    
    Also fixed outFields==null


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92736533
  
    --- Diff: examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import com.google.common.collect.Lists;
    +
    +import org.apache.commons.compress.utils.IOUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.StormSubmitter;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.model.jpmml.JpmmlModelOutputFields;
    +import org.apache.storm.pmml.runner.jpmml.JpmmlFactory;
    +import org.apache.storm.topology.BasicOutputCollector;
    +import org.apache.storm.topology.IRichBolt;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseBasicBolt;
    +import org.apache.storm.tuple.Tuple;
    +
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +public class JpmmlRunnerTestTopology {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @hmcl can you upmerge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by csivaguru <gi...@git.apache.org>.
Github user csivaguru commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r91615776
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JpmmlFactory.java ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml.runner.jpmml;
    +
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.dmg.pmml.IOUtil;
    +import org.dmg.pmml.PMML;
    +import org.jpmml.evaluator.Evaluator;
    +import org.jpmml.evaluator.ModelEvaluator;
    +import org.jpmml.evaluator.ModelEvaluatorFactory;
    +import org.jpmml.manager.PMMLManager;
    +import org.xml.sax.SAXException;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Objects;
    +
    +import javax.xml.bind.JAXBException;
    +
    +/*
    + * This class consists exclusively of static factory methods that create
    + * object instances that are essential to work with the Jpmml library
    + */
    +public class JpmmlFactory {
    +    /**
    +     * Creates a new {@link PMML} object representing the PMML model defined in the XML {@link File} specified as argument
    +     */
    +    public static PMML newPmml(File file) throws JAXBException, SAXException, IOException {
    +        Objects.requireNonNull(file);
    +        return IOUtil.unmarshal(file);
    +    }
    +
    +    /**
    +     * Creates a new {@link Evaluator} object representing the PMML model defined in the {@link PMML} argument
    +     */
    +    public static Evaluator newEvaluator(PMML pmml) {
    +        Objects.requireNonNull(pmml);
    +        final PMMLManager pmmlManager = new PMMLManager(pmml);
    +        return (ModelEvaluator<?>)pmmlManager.getModelManager(null, ModelEvaluatorFactory.getInstance());
    +    }
    +
    +    /**
    +     * Creates a new {@link Evaluator} object representing the PMML model defined in the XML {@link File} specified as argument
    +     */
    +    public static  Evaluator newEvaluator(File file) throws IOException, JAXBException, SAXException {
    +        Objects.requireNonNull(file);
    +        return newEvaluator(newPmml(file));
    +    }
    +
    +
    +    public static class ModelRunner implements ModelRunnerFactory {
    --- End diff --
    
    Consider renaming, Having two ModelRunner artifacts (Class here) and the interface might be confusing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r91297902
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/ModelRunner.java ---
    @@ -0,0 +1,30 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml.runner;
    +
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Tuple;
    +
    +import java.util.List;
    +
    +public interface ModelRunner {
    --- End diff --
    
    Since bolt can be executed in distributed environment, `ModelRunner` interface should `extends Serializable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @harshach Thanks for your detailed explanations. Yes, this is a good start. I'm +1 for adding this in.  This might attract many people who interest NLP, ML.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92243287
  
    --- Diff: external/storm-pmml/README.md ---
    @@ -0,0 +1,104 @@
    +#Storm PMML Bolt
    + Storm integration to load PMML models and compute predictive scores for running tuples. The PMML model represents
    + the machine learning (predictive) model used to do prediction on raw input data. The model is typically loaded into a 
    + runtime environment, which will score the raw data that comes in the tuples. 
    +
    +#Create Instance of PMML Bolt
    + To create an instance of the `PMMLPredictorBolt` you must provide a `ModelRunner` using a `ModelRunnerFactory`,
    + and optionally an instance of `ModelOutputFields`. The `ModelOutputFields` is only required if you wish to emit
    + tuples with predicted scores to one or multiple streams. Otherwise, the `PMMLPredictorBolt` will declare no
    + output fields.
    + 
    + The `ModelRunner` represents the runtime environment to execute the predictive scoring. It has only one method: 
    + 
    + ```java
    +    Map<Stream, List<Object>> scoredTuplePerStream(Tuple input); 
    + ```
    + 
    + This method contains the logic to compute the scored tuples from the raw inputs tuple.  It's up to the discretion of the 
    + implementation to define which scored values are to be assigned to each `Stream`. A `Stream` is a representation of a Storm stream.
    +   
    + The `PmmlModelRunner` is an extension of `ModelRunner` that represents the typical steps involved 
    + in predictive scoring. Hence, it allows for the **extraction** of raw inputs from the tuple, **pre process** the 
    + raw inputs, and **predict** the scores from the preprocessed data.
    + 
    + The `JPmmlModelRunner` is an implementation of `PmmlModelRunner` that uses [JPMML](https://github.com/jpmml/jpmml) as
    + runtime environment. This implementation extracts the raw inputs from the tuple for all `active fields`, 
    + and builds a tuple with the predicted scores for the `predicted fields` and `output fields`. 
    + In this implementation all the declared streams will have the same scored tuple.
    + 
    + The `predicted`, `active`, and `output` fields are extracted from the PMML model.
    +
    +#Run Bundled Examples
    +
    +To run the examples you must copy the `storm-pmml` uber jar to `STORM-HOME/extlib` and then run the command:
    --- End diff --
    
    Can't the example just be a self contained shaded jar that is ready for deployment? It seems odd to require users to modify the pom and copy jars into extlib to run the example. Most of the other examples are deployable jars.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92328164
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java ---
    @@ -39,7 +39,6 @@
     import java.io.InputStreamReader;
     import java.util.HashMap;
     import java.util.Map;
    -import java.util.concurrent.TimeUnit;
    --- End diff --
    
    unrelated file KafkaSpoutTopologyMainNamedTopics.java updated?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92328626
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/PMMLPredictorBolt.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.runner.ModelRunner;
    +import org.apache.storm.pmml.runner.ModelRunnerFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PMMLPredictorBolt extends BaseRichBolt {
    +    protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class);
    +
    +    private ModelOutputFields outFields;
    +    private ModelRunnerFactory runnerFactory;
    +    private ModelRunner runner;
    +    private OutputCollector collector;
    +
    +    /*
    +     * Passing a factory rather than the actual object to avoid enforcing the strong
    +     * requirement of having to have ModelRunner to be Serializable
    +     */
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor has no declared output fields
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory) {
    +        this(modelRunnerFactory, null);
    +    }
    +
    +    /**
    +     * Creates an instance of {@link PMMLPredictorBolt} that executes, for every tuple, the runner constructed with
    +     * the {@link ModelRunnerFactory} specified in the parameter
    +     * The {@link PMMLPredictorBolt} instantiated with this constructor declares the output fields as specified
    +     * by the {@link ModelOutputFields} parameter
    +     */
    +    public PMMLPredictorBolt(ModelRunnerFactory modelRunnerFactory, ModelOutputFields modelOutputFields) {
    +        this.outFields = modelOutputFields;
    +        this.runnerFactory = modelRunnerFactory;
    +        LOG.info("Instantiated {}", this);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.runner = runnerFactory.newModelRunner();
    +        this.collector = collector;
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        try {
    +            final Map<Stream, List<Object>> scoresPerStream = runner.scoredTuplePerStream(input);
    +            LOG.debug("Input tuple [{}] generated predicted scores [{}]", input, scoresPerStream);
    +            if (scoresPerStream != null) {
    +                for (Stream stream : scoresPerStream.keySet()) {
    --- End diff --
    
    use entrySet


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @vesense @HeartSaVioR this was discussed in the [JIRA](https://issues.apache.org/jira/browse/STORM-2223) - please take a look at the discussion thread. I believe that the [JPMML](https://github.com/jpmml/jpmml) license (the legacy code) is OK as discussed there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92330245
  
    --- Diff: external/storm-pmml/src/main/java/org/apache/storm/pmml/model/Stream.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml.model;
    +
    +import org.apache.storm.utils.Utils;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Represents a stream emitted by a Storm component
    + */
    +public class Stream implements Serializable {
    --- End diff --
    
    It seems the `Stream` just wraps the stream name and the direct flag. However in the PMMLPredictorBolt `emitDirect` is not used at all. If  all you want is to capture the stream name where the tuples have to be emitted just use String instead of wrapping it in another class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r93530698
  
    --- Diff: storm-dist/binary/final-package/src/main/assembly/binary.xml ---
    @@ -446,6 +446,29 @@
                     <include>storm*jar</include>
                 </includes>
             </fileSet>
    +
    +        <fileSet>
    +            <directory>${project.basedir}/../../external/storm-pmml/target</directory>
    +            <outputDirectory>external/storm-pmml</outputDirectory>
    +            <includes>
    +                <include>storm*jar</include>
    +            </includes>
    +        </fileSet>
    +        <fileSet>
    +            <directory>${project.basedir}/../../external/storm-pmml</directory>
    +            <outputDirectory>external/storm-pmml</outputDirectory>
    +            <includes>
    +                <include>README.*</include>
    +            </includes>
    +        </fileSet>
    +        <fileSet>
    +            <directory>${project.basedir}/../../examples/storm-pmml-examples/target</directory>
    +            <outputDirectory>examples/storm-pmml-examples</outputDirectory>
    +            <includes>
    +                <include>storm*jar</include>
    +            </includes>
    +        </fileSet>
    +
    --- End diff --
    
    @ptgoetz done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @hmcl can you open a PR for 1.x-branch as well. I would like to merge this in today. 
    There is pending comment about license on the data file. Can you take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1816#discussion_r92326957
  
    --- Diff: examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.pmml;
    +
    +import com.google.common.collect.Lists;
    +
    +import org.apache.commons.compress.utils.IOUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.StormSubmitter;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.pmml.model.ModelOutputFields;
    +import org.apache.storm.pmml.model.Stream;
    +import org.apache.storm.pmml.model.jpmml.JpmmlModelOutputFields;
    +import org.apache.storm.pmml.runner.jpmml.JpmmlFactory;
    +import org.apache.storm.topology.BasicOutputCollector;
    +import org.apache.storm.topology.IRichBolt;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseBasicBolt;
    +import org.apache.storm.tuple.Tuple;
    +
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +public class JpmmlRunnerTestTopology {
    +    private static final String RAW_INPUT_FROM_CSV_SPOUT = "rawInputFromCsvSpout";
    +    private static final String PMML_PREDICTOR_BOLT = "pmmLPredictorBolt";
    +    private static final String PRINT_BOLT = "printBolt";
    +
    +    private File rawInputs;           // Raw input data to be scored (predicted)
    +    private File pmml;                // PMML Model
    +    private boolean isLocal;
    +    private String tplgyName;
    +
    +    public static void main(String[] args) throws Exception {
    +        try {
    +            JpmmlRunnerTestTopology testTopology = new JpmmlRunnerTestTopology();
    +            testTopology.parseArgs(args);
    +            System.out.println(String.format("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]",
    +                    testTopology.pmml.getAbsolutePath(), testTopology.rawInputs.getAbsolutePath()));
    +            testTopology.run();
    +        } catch (Exception e) {
    +            printUsage();
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private void parseArgs(String[] args) {
    +        if (Arrays.stream(args).anyMatch(option -> option.equals("-h"))) {
    +            printUsage();
    +        } else {
    +            if (args.length < 3) {
    +                tplgyName = "pmmlPredictorLocal";
    +                isLocal = true;
    +                if (args.length == 0) {     // run local examples
    +                    pmml = loadExample(pmml, "KNIME_PMML_4.1_Examples_single_audit_logreg.xml");
    +                    rawInputs = loadExample(rawInputs, "Audit.50.csv");
    +                } else {
    +                    pmml = new File(args[0]);
    +                    rawInputs = new File(args[1]);
    +                }
    +            } else {
    +                tplgyName = args[0];
    +                pmml = new File(args[1]);
    +                rawInputs = new File(args[2]);
    +                isLocal = false;
    +            }
    +        }
    +    }
    +
    +    private File loadExample(File file, String example) {
    +        try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(example)) {
    +            file = File.createTempFile("/tmp/" + example, ".tmp");
    --- End diff --
    
    the first argument is a prefix of the file name not a directory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @harshach 1.x-branch [PR](https://github.com/apache/storm/pull/1838).
    
    @harshach I believe that the license issue was addressed in [here](https://github.com/apache/storm/pull/1816#pullrequestreview-12756266)
            
    What I did was to follow the license guidelines as described in [here](http://www.apache.org/dev/licensing-howto.html#permissive-deps), 
    which is to add the license entry to the src [LICENSE](https://github.com/apache/storm/pull/1816/files#diff-9879d6db96fd29134fc802214163b95a)
    also for binary [LICENSE](https://github.com/apache/storm/pull/1816/files#diff-8d914c320178e25abc06496e0fb9fc8b)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1816: STORM-2223: PMMLBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1816
  
    @hmcl I am +1 on merging. I would like to see the InputStream option added to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---