You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by mans2singh <gi...@git.apache.org> on 2018/05/08 05:16:04 UTC

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

GitHub user mans2singh opened a pull request:

    https://github.com/apache/nifi/pull/2686

    NIFI-5166 - Deep learning classification and regression processor wit…

    …h deeplearning4j
    
    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ x] Have you written or updated unit tests to verify your changes?
    - [ x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [x] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mans2singh/nifi NIFI-5166

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2686.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2686
    
----
commit f58316a906af820f9f056c6ebee171015685f86b
Author: mans2singh <ma...@...>
Date:   2018-05-08T05:11:00Z

    NIFI-5166 - Deep learning classification and regression processor with deeplearning4j

----


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r193622053
  
    --- Diff: nifi-assembly/pom.xml ---
    @@ -379,6 +379,12 @@ language governing permissions and limitations under the License. -->
                 <version>1.7.0-SNAPSHOT</version>
                 <type>nar</type>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-deeplearning4j-nar</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <type>nar</type>
    +        </dependency>
    --- End diff --
    
    The result nifi-deeplearning4j-nar-1.7.0-SNAPSHOT.nar is quite huge in size, about 3GB. By looking at the bundled dependencies, it contains lots of jars built for specific OS, for example:
    ```
    6516020 | nd4j-native-1.0.0-alpha-android-arm.jar
    6993768 | nd4j-native-1.0.0-alpha-android-arm64.jar
    9270711 | nd4j-native-1.0.0-alpha-android-x86.jar
    8064383 | nd4j-native-1.0.0-alpha-android-x86_64.jar
    5175162 | nd4j-native-1.0.0-alpha-ios-arm64.jar
    6077149 | nd4j-native-1.0.0-alpha-ios-x86_64.jar
    5677978 | nd4j-native-1.0.0-alpha-linux-ppc64le.jar
    7305963 | nd4j-native-1.0.0-alpha-linux-x86_64.jar
    8246215 | nd4j-native-1.0.0-alpha-macosx-x86_64.jar
    8149497 | nd4j-native-1.0.0-alpha-windows-x86_64.jar
    ```
    
    We should try to reduce the nar size, otherwise we can't bundle it as a standard release binary. Need to enable nifi-deeplearning4j-nar only if user needs it by adding a Maven profile similar to GRPC include-grpc.



---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @markap14, NIFI team
    
    Just wondering if you have any feedback on this processor.  
    
    Thanks


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187351854
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    --- End diff --
    
    This should be marked final to make it clear when reading the code that it won't change.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187352722
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    +               flowFile = session.importFrom(bais, flowFile);
    +           }
    +
    +           session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape);
    +
    +           final long endTimeMillis = System.currentTimeMillis();
    +
    +           session.transfer(flowFile, REL_SUCCESS);
    +
    +           session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context),
    +                    (endTimeMillis - startTimeMillis));
    +        } catch (Exception exception) {
    +            flowFile = populateErrorAttributes(session, flowFile, exception.getMessage());
    +                getLogger().error("Failed to process data due to {} for input {}",
    +                        new Object[]{exception.getLocalizedMessage(), input}, exception);
    +                session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    protected String getFlowFileContents(final ProcessSession session, Charset charset, FlowFile incomingFlowFile)
    +            throws IOException {
    +        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
    +            session.exportTo(incomingFlowFile, baos);
    +            return new String(baos.toByteArray(), charset);
    +        }
    +    }
    +
    +    protected int [] getInputDimensions(final ProcessContext context, Charset charset, FlowFile flowFile, String separator)
    +            throws IOException {
    +        String values = context.getProperty(RECORD_DIMENSIONS).evaluateAttributeExpressions(flowFile).getValue();
    +        return Arrays.stream(
    +                values.split(separator))
    +                    .mapToInt(val -> Integer.parseInt(val)).toArray();
    +    }
    +
    +    protected String makeProvenanceUrl(final ProcessContext context) {
    +        return new StringBuilder("deeplearning4j://")
    +            .append(context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue()).toString();
    +    }
    +
    +    protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile,
    +            String message) {
    +        Map<String,String> attributes = new HashMap<>();
    --- End diff --
    
    No need to create a HashMap here -- can just use `flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, String.valueOf(message);` -- and if just that single line would probably eliminate the method all together and do inline.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187353262
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    --- End diff --
    
    Is more efficient here (and probably more straight-forward?) to simply do:
    `flowFile = session.write(flowFile, out -> out.write(jsonResult.getBytes(charset)));`


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r193654571
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt ---
    @@ -0,0 +1,100 @@
    +1.1,0.5,0.5,0.2,0
    --- End diff --
    
    It seems this text and test are based on the [DL4J CSVExample](https://github.com/deeplearning4j/dl4j-examples/blob/master/dl4j-examples/src/main/java/org/deeplearning4j/examples/dataexamples/CSVExample.java).
    
    IMHO, instead of mock classification, using the [iris.txt](https://github.com/deeplearning4j/dl4j-examples/blob/master/dl4j-examples/src/main/resources/iris.txt) and the same configuration with CSVExample to train the test model would be more understandable, especially for those new to Deep Learning and Neural Network topic.
    
    Because the iris.txt dataset has more background information what the data is, and what classes the model is supposed to predicate. Current mock classification can not imagine such actual objective and can be hard to understand what it does. And having the same number (4) for different things, such as inputNumber, outputNumber, numClasses makes even harder to grasp what those numbers are.
    
    How do you think?


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @mans2singh I have been thinking about how to use this processor in a practical NiFi data flow. Certainly the processor can use a deep learning model to classify or predict using regression, but current approach that writing evaluation result into FlowFile content may not be useful in real data flows.
    
    Let's say user want to route incoming data into different branches of a data flow to process differently. The most basic use-case would be binary classification. If a given data is predicted as class A, then do something, such as sending an alert. In order to do so, we need to carry original data to report meaningful alert. By rewriting FlowFile content only using result makes it difficult to tie-up original data and prediction result, and it is hard to construct subsequent flow.
    
    By considering real use-cases more, I started feeling this is more of a Enrich or Lookup pattern.
    
    ```
    # Original dataset
    Record1
    Record2
    Record3
    
    # Convert the original dataset into a vector to applying a model, while keeping original data to preserve relationships.
    Record1, Feature Vector1
    Record2, Feature Vector2
    Record3, Feature Vector3
    
    # Then we can further enrich records with prediction results
    Record1, Result1 (A:0.9, B:0.1)
    Record2, Result2 (A:0.85, B:0.15)
    Record3, Result3 (A:0.05, B:0.95)
    
    # Once we have such FlowFile, we can filter certain dataset based on prediction
    # Route class A into flow branch A
    Record1
    Record2
    # Route class B into branch B
    Record 3
    
    # Then we can produce some meaningful report using original information
    Send an alert based on Record3
    ```
    
    Have you ever looked at LookupRecord processor and RecordLookupService controller service? I think we can do more interesting things if we implement as a RecordLookupService.
    
    How do you think?
    
    



---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @ijokarumawak - Just following-up:
    
    1.  Regarding implementing record look up service - I believe that the processor and a record service lookup can be separate components useful for different use cases. As the file/rdbms based flows I've mentioned above, show - the processor can be used as a transformer.
    2. Regarding providing more tools to prepare data - You are right, we can do that once we have the basics in place and there is a need for it. 
    3. You had mentioned the concern of writing the results (predictions) in the body of the flow file - if you/community think we should keep the observations in the body and put the output in an attribute, I'd be happy to change that.
    
    Thanks again for your thoughts and let me know if you have any more advice/recommendations.
    
    Mans


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187353741
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    +               flowFile = session.importFrom(bais, flowFile);
    +           }
    +
    +           session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape);
    +
    +           final long endTimeMillis = System.currentTimeMillis();
    +
    +           session.transfer(flowFile, REL_SUCCESS);
    +
    +           session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context),
    --- End diff --
    
    I don't believe a SEND event is appropriate here. We are not sending the data to some external location. I think we want a CONTENT_MODIFIED event. It probably does make sense, though, to include the time duration and if wanting to include the name of the Model File, that could be done in the event details.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187654197
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    --- End diff --
    
    This is the intuitive name I could come up with.  Please let me know if you have any other recommendations.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187654258
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    +               flowFile = session.importFrom(bais, flowFile);
    +           }
    +
    +           session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape);
    +
    +           final long endTimeMillis = System.currentTimeMillis();
    +
    +           session.transfer(flowFile, REL_SUCCESS);
    +
    +           session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context),
    +                    (endTimeMillis - startTimeMillis));
    +        } catch (Exception exception) {
    +            flowFile = populateErrorAttributes(session, flowFile, exception.getMessage());
    +                getLogger().error("Failed to process data due to {} for input {}",
    +                        new Object[]{exception.getLocalizedMessage(), input}, exception);
    +                session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    protected String getFlowFileContents(final ProcessSession session, Charset charset, FlowFile incomingFlowFile)
    +            throws IOException {
    +        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
    +            session.exportTo(incomingFlowFile, baos);
    +            return new String(baos.toByteArray(), charset);
    +        }
    +    }
    +
    +    protected int [] getInputDimensions(final ProcessContext context, Charset charset, FlowFile flowFile, String separator)
    +            throws IOException {
    +        String values = context.getProperty(RECORD_DIMENSIONS).evaluateAttributeExpressions(flowFile).getValue();
    +        return Arrays.stream(
    +                values.split(separator))
    +                    .mapToInt(val -> Integer.parseInt(val)).toArray();
    +    }
    +
    +    protected String makeProvenanceUrl(final ProcessContext context) {
    +        return new StringBuilder("deeplearning4j://")
    +            .append(context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue()).toString();
    +    }
    +
    +    protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile,
    +            String message) {
    +        Map<String,String> attributes = new HashMap<>();
    --- End diff --
    
    Corrected.


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    Hi @ijokarumawak - 
    
    I've merged your changes.  Please let me know if you have any more recommendations.
    
    Thanks for your help.
    
    Mans 


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @ijokarumawak - 
    Thanks for your feedback.  I was away for a few days and will respond to your comments soon.
    Mans


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    Hi @mans2singh 
    
    Thanks for sharing the templates. I will try it later but I understand how it works by using RDMBS to join input data and classification results.
    
    I think we need to provide more toolset so that people can use DL4J easily from NiFi flows. So a couple of points for further discussion:
    
    1. Have you ever considered a DL4J component implementing RecordLookup interface so that LookupRecord processor? I've mentioned about this before. Once we provide this pattern, RDBMS is no longer required to use evaluation result in later flow.
    
    2. How would NiFi user convert a raw data (CSV, text, images, audio ... etc) into vector format that can be fed to DL4J Networks? I haven't explored yet myself, but can we provide some other Processors or RecordReader/Writers to do vectorization process, probably using DataVec? https://deeplearning4j.org/datavec
    
    
    I will ask dev ML to see if there's anyone to join the review process, too. Thanks!


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187663098
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    --- End diff --
    
    Corrected.  Thanks


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @mans2singh the issue that you noted in Travis is unrelated to your PR and is a problem with an existing using test, unfortunately. So nothing to do there, really. Hopefully it will be addressed on mater soon.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r193651707
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    --- End diff --
    
    I'm fairly new to DL4J so I can be totally wrong. But DL4J seems having two distinct model types, MultiLayerNetwork and ComputationGraph. If we are going to add ComputationGraph version in the future, which can be more complex, having multiple input arrays ... etc, then it may be a chance to add another processor.
    Based on that, I think following names may sound good, too:
    - ApplyDL4JMultiLayerNetwork
    - ApplyDL4JComputationGraph
    
    It seems whether classification or regression does not matter in this processor's point of view.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187354930
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt ---
    @@ -0,0 +1,100 @@
    +1.1,0.5,0.5,0.2,0
    --- End diff --
    
    Where did this text file come from? Was it generated by you, or did you find it online somewhere, etc.? Do we have the appropriate license for this?


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187655735
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    +               flowFile = session.importFrom(bais, flowFile);
    +           }
    +
    +           session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape);
    +
    +           final long endTimeMillis = System.currentTimeMillis();
    +
    +           session.transfer(flowFile, REL_SUCCESS);
    +
    +           session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context),
    +                    (endTimeMillis - startTimeMillis));
    +        } catch (Exception exception) {
    +            flowFile = populateErrorAttributes(session, flowFile, exception.getMessage());
    +                getLogger().error("Failed to process data due to {} for input {}",
    +                        new Object[]{exception.getLocalizedMessage(), input}, exception);
    +                session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    --- End diff --
    
    Removed yield.  Thanks for the comments.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187653485
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    --- End diff --
    
    Corrected.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187653950
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    +            .required(true)
    +            .defaultValue(",")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-separator")
    +            .displayName("Record Separator")
    +            .description("Specifies the records separator in the message body. (defaults to new line)")
    +            .required(true)
    +            .defaultValue(System.lineSeparator())
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MODEL_FILE = new PropertyDescriptor.Builder()
    +            .name("model-file")
    +            .displayName("Model File")
    +            .description("Location of the Deeplearning4J model zip file")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_DIMENSIONS = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-dimension")
    +            .displayName("Record dimensions separated by field separator")
    +            .description("Dimension of array in each a record (eg: 2,4 - a 2x4 array)")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final String DEEPLEARNING4J_ERROR_MESSAGE = "deeplearning4j.error.message";
    +
    +    public static final String DEEPLEARNING4J_OUTPUT_SHAPE = "deeplearning4j.output.shape";
    +
    +    protected MultiLayerNetwork model = null;
    --- End diff --
    
    @markap14 - The method to get the model is synchronized.  Please let me know if there is anything else required.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187653982
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    +            .required(true)
    +            .defaultValue(",")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-separator")
    +            .displayName("Record Separator")
    +            .description("Specifies the records separator in the message body. (defaults to new line)")
    +            .required(true)
    +            .defaultValue(System.lineSeparator())
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MODEL_FILE = new PropertyDescriptor.Builder()
    +            .name("model-file")
    +            .displayName("Model File")
    +            .description("Location of the Deeplearning4J model zip file")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_DIMENSIONS = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-dimension")
    +            .displayName("Record dimensions separated by field separator")
    +            .description("Dimension of array in each a record (eg: 2,4 - a 2x4 array)")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final String DEEPLEARNING4J_ERROR_MESSAGE = "deeplearning4j.error.message";
    +
    +    public static final String DEEPLEARNING4J_OUTPUT_SHAPE = "deeplearning4j.output.shape";
    +
    +    protected MultiLayerNetwork model = null;
    +
    +    protected synchronized MultiLayerNetwork getModel(ProcessContext context) throws IOException {
    +        if ( model == null ) {
    +            String modelFile = context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue();
    +            if ( getLogger().isDebugEnabled()) {
    --- End diff --
    
    Corrected.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187350490
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    +            .required(true)
    +            .defaultValue(",")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-separator")
    +            .displayName("Record Separator")
    +            .description("Specifies the records separator in the message body. (defaults to new line)")
    +            .required(true)
    +            .defaultValue(System.lineSeparator())
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MODEL_FILE = new PropertyDescriptor.Builder()
    +            .name("model-file")
    +            .displayName("Model File")
    +            .description("Location of the Deeplearning4J model zip file")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_DIMENSIONS = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-dimension")
    +            .displayName("Record dimensions separated by field separator")
    +            .description("Dimension of array in each a record (eg: 2,4 - a 2x4 array)")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final String DEEPLEARNING4J_ERROR_MESSAGE = "deeplearning4j.error.message";
    +
    +    public static final String DEEPLEARNING4J_OUTPUT_SHAPE = "deeplearning4j.output.shape";
    +
    +    protected MultiLayerNetwork model = null;
    +
    +    protected synchronized MultiLayerNetwork getModel(ProcessContext context) throws IOException {
    +        if ( model == null ) {
    +            String modelFile = context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue();
    +            if ( getLogger().isDebugEnabled()) {
    --- End diff --
    
    Probably not necessary to bother checking for isDebugEnabled


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187357216
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    +               flowFile = session.importFrom(bais, flowFile);
    +           }
    +
    +           session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape);
    +
    +           final long endTimeMillis = System.currentTimeMillis();
    +
    +           session.transfer(flowFile, REL_SUCCESS);
    +
    +           session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context),
    +                    (endTimeMillis - startTimeMillis));
    +        } catch (Exception exception) {
    +            flowFile = populateErrorAttributes(session, flowFile, exception.getMessage());
    +                getLogger().error("Failed to process data due to {} for input {}",
    +                        new Object[]{exception.getLocalizedMessage(), input}, exception);
    +                session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    --- End diff --
    
    I don't think context.yield() is appropriate here. There is no condition that is occurring that would result in the processor unable to perform its task for some period of time, after which it would work again. That's what context.yield() is for. In this case, it is likely that the processor is misconfigured or that the data is invalid, correct? So in that case, I would just route to failure and continue on.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r196994774
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "multilayer", "predict", "classification", "regression", "deep", "learning", "neural", "network"})
    +@CapabilityDescription("The DeepLearning4JMultiLayerPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JMultiLayerPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected final Gson gson = new Gson();
    +
    +    protected MultiLayerNetwork model = null;
    +
    +    @OnStopped
    +    public void close() {
    +        getLogger().info("Closing");
    +        model = null;
    +    }
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    protected synchronized MultiLayerNetwork getModel(ProcessContext context) throws IOException {
    +        if ( model == null ) {
    +            String modelFile = context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue();
    +            getLogger().debug("Loading model from {}", new Object[] {modelFile});
    +
    +            long start = System.currentTimeMillis();
    +            model = ModelSerializer.restoreMultiLayerNetwork(modelFile,false);
    +            long end = System.currentTimeMillis();
    +
    +            getLogger().info("Time to load model " + (end-start) +  " ms");
    +        }
    +        return (MultiLayerNetwork)model;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    --- End diff --
    
    @jzonthemtn - I was going for lazy loading but it's not a problem to change it.  


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by jzonthemtn <gi...@git.apache.org>.
Github user jzonthemtn commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r196139753
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "multilayer", "predict", "classification", "regression", "deep", "learning", "neural", "network"})
    +@CapabilityDescription("The DeepLearning4JMultiLayerPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JMultiLayerPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected final Gson gson = new Gson();
    +
    +    protected MultiLayerNetwork model = null;
    +
    +    @OnStopped
    +    public void close() {
    +        getLogger().info("Closing");
    +        model = null;
    +    }
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    protected synchronized MultiLayerNetwork getModel(ProcessContext context) throws IOException {
    +        if ( model == null ) {
    +            String modelFile = context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue();
    +            getLogger().debug("Loading model from {}", new Object[] {modelFile});
    +
    +            long start = System.currentTimeMillis();
    +            model = ModelSerializer.restoreMultiLayerNetwork(modelFile,false);
    +            long end = System.currentTimeMillis();
    +
    +            getLogger().info("Time to load model " + (end-start) +  " ms");
    +        }
    +        return (MultiLayerNetwork)model;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    --- End diff --
    
    Would loading the model in an `@OnScheduled` function be better than waiting until the processor is triggered to load the model?


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    Sent mans2singh/nifi#1


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187349802
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    +            .required(true)
    +            .defaultValue(",")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-separator")
    +            .displayName("Record Separator")
    +            .description("Specifies the records separator in the message body. (defaults to new line)")
    +            .required(true)
    +            .defaultValue(System.lineSeparator())
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MODEL_FILE = new PropertyDescriptor.Builder()
    +            .name("model-file")
    +            .displayName("Model File")
    +            .description("Location of the Deeplearning4J model zip file")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_DIMENSIONS = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-dimension")
    +            .displayName("Record dimensions separated by field separator")
    --- End diff --
    
    Would recommend setting display name to just "Record Dimensions" and then including the fact that it is separated by field separator int eh description.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187653539
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    +            .required(true)
    +            .defaultValue(",")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-separator")
    +            .displayName("Record Separator")
    +            .description("Specifies the records separator in the message body. (defaults to new line)")
    --- End diff --
    
    Corrected


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187349263
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    +            .required(true)
    +            .defaultValue(",")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-separator")
    +            .displayName("Record Separator")
    +            .description("Specifies the records separator in the message body. (defaults to new line)")
    --- End diff --
    
    Should avoid mentioning the default in the description, as that's automatically included in the generated documentation anyway.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r193646335
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning", "neural", "network"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected final Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           flowFile = session.write(flowFile, out -> out.write(jsonResult.getBytes(charset)));
    +
    +           session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape);
    +
    +           final long endTimeMillis = System.currentTimeMillis();
    +
    +           session.transfer(flowFile, REL_SUCCESS);
    +
    +           session.getProvenanceReporter().modifyContent(flowFile, makeProvenanceUrl(context),
    +                    (endTimeMillis - startTimeMillis));
    +        } catch (Exception exception) {
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, String.valueOf(exception.getMessage()));
    +            getLogger().error("Failed to process data due to {} for input {}",
    +                    new Object[]{exception.getLocalizedMessage(), input}, exception);
    +            session.transfer(flowFile, REL_FAILURE);
    +        }
    +    }
    +
    +    protected String getFlowFileContents(final ProcessSession session, Charset charset, FlowFile incomingFlowFile)
    +            throws IOException {
    +        final byte[] buffer = new byte[(int) incomingFlowFile.getSize()];
    +        try (final InputStream in = session.read(incomingFlowFile)) {
    +            StreamUtils.fillBuffer(in, buffer);
    +        }
    +        return new String(buffer, charset);
    +    }
    +
    +    protected int [] getInputDimensions(final ProcessContext context, Charset charset, FlowFile flowFile, String separator)
    +            throws IOException {
    +        String values = context.getProperty(RECORD_DIMENSIONS).evaluateAttributeExpressions(flowFile).getValue();
    +        return Arrays.stream(
    +                values.split(separator))
    +                    .mapToInt(val -> Integer.parseInt(val)).toArray();
    +    }
    +
    +    protected String makeProvenanceUrl(final ProcessContext context) {
    +        return new StringBuilder("deeplearning4j://")
    +            .append(context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue()).toString();
    +    }
    +
    +    protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile,
    +            String message) {
    +        Map<String,String> attributes = new HashMap<>();
    --- End diff --
    
    This `populateErrorAttributes` method is no longer used and should be deleted.


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @markap14 @ijokarumawak @jzonthemtn and Nifi Team: Please let me know if you have any additional comments for this processor.  Thanks.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r199823948
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml ---
    @@ -0,0 +1,114 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-deeplearning4j-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-deeplearning4j-processors</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-api</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-native-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-8.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.1-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    --- End diff --
    
    @ijokarumawak - Thanks for your updates, I will try them out locally.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r194943820
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt ---
    @@ -0,0 +1,100 @@
    +1.1,0.5,0.5,0.2,0
    --- End diff --
    
    @ijokarumawak - The tests included are to show how we can generate a model and configure the processor for regression and classification in the integration tests.  My thought while creating the mock data was create a the model with reproducible results even with limited observations and few iterations.  In real life, a multilayer model will be created, tested and validated by the user prior to plugging it into the component. 


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187350055
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    +            .required(true)
    +            .defaultValue(",")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-separator")
    +            .displayName("Record Separator")
    +            .description("Specifies the records separator in the message body. (defaults to new line)")
    +            .required(true)
    +            .defaultValue(System.lineSeparator())
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MODEL_FILE = new PropertyDescriptor.Builder()
    +            .name("model-file")
    +            .displayName("Model File")
    +            .description("Location of the Deeplearning4J model zip file")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_DIMENSIONS = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-dimension")
    +            .displayName("Record dimensions separated by field separator")
    +            .description("Dimension of array in each a record (eg: 2,4 - a 2x4 array)")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final String DEEPLEARNING4J_ERROR_MESSAGE = "deeplearning4j.error.message";
    +
    +    public static final String DEEPLEARNING4J_OUTPUT_SHAPE = "deeplearning4j.output.shape";
    +
    +    protected MultiLayerNetwork model = null;
    --- End diff --
    
    All member variables need to be protected / accessed in a thread-safe manner.


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @joewitt - I will remove the nar from the assembly.  Let me know if there is any additional feedback.  Thanks.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187349196
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    --- End diff --
    
    Should avoid mentioning the default in the description, as that's automatically included in the generated documentation anyway.


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @mans2singh  yes please don't include the new built nar in the assembly.  We need an extension registry so for now just dont include it.  People can still pull it in and use it if they wish


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r194944047
  
    --- Diff: nifi-assembly/pom.xml ---
    @@ -379,6 +379,12 @@ language governing permissions and limitations under the License. -->
                 <version>1.7.0-SNAPSHOT</version>
                 <type>nar</type>
             </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-deeplearning4j-nar</artifactId>
    +            <version>1.7.0-SNAPSHOT</version>
    +            <type>nar</type>
    +        </dependency>
    --- End diff --
    
    I would be happy to create a profile if that is the decision.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r199692925
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml ---
    @@ -0,0 +1,114 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-deeplearning4j-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-deeplearning4j-processors</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-api</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-native-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-8.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.1-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    --- End diff --
    
    1.0.0-beta version is available now. After updating to beta, tests for regression start to fail with following error:
    ```
    15:25:02.648 [pool-1-thread-1] ERROR org.apache.nifi.processors.deeplearning4j.DeepLearning4JMultiLayerPredictor - 
    org.nd4j.linalg.exception.ND4JIllegalStateException: Unable to create a 1d array from a non vector!
    	at org.nd4j.linalg.api.ndarray.BaseNDArray.toDoubleVector(BaseNDArray.java:3249)
    	at org.apache.nifi.processors.deeplearning4j.DeepLearning4JMultiLayerPredictor.onTrigger(DeepLearning4JMultiLayerPredictor.java:180)
    	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    	at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:251)
    	at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:245)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ```
    
    I am fixing the error locally, and will send a PR to your branch.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187351084
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    --- End diff --
    
    We try to avoid using nouns as processor names and instead use <Verb><Noun> such as PredictClassification (though this allows for both classifications and regressions so I don't know if there's a term for that?)


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187354340
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    +               flowFile = session.importFrom(bais, flowFile);
    +           }
    +
    +           session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape);
    +
    +           final long endTimeMillis = System.currentTimeMillis();
    +
    +           session.transfer(flowFile, REL_SUCCESS);
    +
    +           session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context),
    +                    (endTimeMillis - startTimeMillis));
    +        } catch (Exception exception) {
    +            flowFile = populateErrorAttributes(session, flowFile, exception.getMessage());
    +                getLogger().error("Failed to process data due to {} for input {}",
    +                        new Object[]{exception.getLocalizedMessage(), input}, exception);
    +                session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    protected String getFlowFileContents(final ProcessSession session, Charset charset, FlowFile incomingFlowFile)
    +            throws IOException {
    +        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
    --- End diff --
    
    This ends up creating the byte array twice, once within the BAOS and once when calling toByteArray().
    Would instead recommend doing something like:
    ```
    final byte[] buffer = new byte[(int) incomingFlowFile.getSize()]
    try (final InputStream in = session.read(incomingFlowFIle)) {
        StreamUtils.fillBuffer(in, buffer);
    }
    return new String(buffer, charset);
    ```


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r200638844
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml ---
    @@ -0,0 +1,114 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-deeplearning4j-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-deeplearning4j-processors</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-api</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-native-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-8.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.1-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    --- End diff --
    
    @ijokarumawak
    
    I tried you changes locally and they look great.  Is it ok with you if I merge them into my branch ?
    
    Once again, thanks for your help.


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @ijokarumawak - Thanks for your comments.  I will checkout the pointers you have given.  Mans


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    Hi @mans2singh sorry for the slow progress. I've been swamped with other urgent tasks. Will come back to continue reviewing this once those get done.
    
    BTW, do you have any suggestion to convert string values into nominal numeric values, things like categories such as log level (ERR, INFO, WARN ... etc).
    I found this web page useful to think about data types for machine learning and deep learning.
    https://towardsdatascience.com/7-data-types-a-better-way-to-think-about-data-types-for-machine-learning-939fae99a689
    
    I wonder how we can implement such conversions at NiFi. I still think NiFi needs the capability to convert general data into numeric vector so that DL4J can be applied. Without that these processors are hard to use.
    
    I briefly took a look at datavec project, but it seems it depends on Spark to execute such transformation. I envision NiFi doing such task within its data flow so that it can use DeepLearning model directly against incoming streaming data.
    https://deeplearning4j.org/datavec
    
    Anyone can join this review, please do so.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187654383
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    +               flowFile = session.importFrom(bais, flowFile);
    +           }
    +
    +           session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape);
    +
    +           final long endTimeMillis = System.currentTimeMillis();
    +
    +           session.transfer(flowFile, REL_SUCCESS);
    +
    +           session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context),
    --- End diff --
    
    Corrected.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r200923513
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml ---
    @@ -0,0 +1,114 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-deeplearning4j-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-deeplearning4j-processors</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-api</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-native-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-8.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.1-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    --- End diff --
    
    @mans2singh Sorry for the delay. Yes, please merge it into your branch. Thanks!


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187355568
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    --- End diff --
    
    It appears that we have only a single Processor here. Is there a reason for having this separate Abstract class?


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    Good Morning Nifi Folks:
    
    The appveyor build is passing for this PR but travis-ci build is failing with the following message:
    
    `[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.20.1:test (default-test) on project nifi-cdc-mysql-processors: There are test failures.
    [ERROR] 
    [ERROR] Please refer to /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/target/surefire-reports for the individual test results.
    [ERROR] Please refer to dump files (if any exist) [date]-jvmRun[N].dump, [date].dumpstream and [date]-jvmRun[N].dumpstream.`
    
    Can you please advice on how to resolve this error ?
    
    Thanks
    
    Mans


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @markap14 - 
    
    Thanks for your prompt review and advice.  
    
    I've updated the code based on your review and am looking forward to your/other members feedback.
    
    Thanks again.
    
    Mans


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187654464
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"})
    +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected Gson gson = new Gson();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    +
    +            long startTimeMillis = System.currentTimeMillis();
    +
    +            String [] inputRecords = input.split(recordSeparator);
    +
    +            List<INDArray> features = Arrays.stream(inputRecords).map(
    +                record -> {
    +                    double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble(
    +                             field -> Double.parseDouble(field)).toArray();
    +
    +                    INDArray featureInput = Nd4j.create(parameters, dimensions);
    +
    +                    if ( getLogger().isDebugEnabled() ) {
    +                        getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ",
    +                            new Object[] {record, parameters, dimensions, featureInput});
    +                    }
    +
    +                    return featureInput;
    +
    +                }).collect(Collectors.toList());
    +
    +           INDArray allFeatures = Nd4j.vstack(features);
    +
    +           INDArray results = model.output(allFeatures);
    +
    +           double [][] partitionedResults = new double[inputRecords.length][];
    +           for (int row = 0; row < inputRecords.length; row++) {
    +                INDArray result = results.getRow(row);
    +                partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector();
    +           }
    +
    +           String jsonResult = gson.toJson(partitionedResults);
    +           int [] shape = results.shape();
    +           String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length));
    +
    +           if ( getLogger().isDebugEnabled() ) {
    +                getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}",
    +                        new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape});
    +           }
    +
    +           try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) {
    +               flowFile = session.importFrom(bais, flowFile);
    +           }
    +
    +           session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape);
    +
    +           final long endTimeMillis = System.currentTimeMillis();
    +
    +           session.transfer(flowFile, REL_SUCCESS);
    +
    +           session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context),
    +                    (endTimeMillis - startTimeMillis));
    +        } catch (Exception exception) {
    +            flowFile = populateErrorAttributes(session, flowFile, exception.getMessage());
    +                getLogger().error("Failed to process data due to {} for input {}",
    +                        new Object[]{exception.getLocalizedMessage(), input}, exception);
    +                session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    protected String getFlowFileContents(final ProcessSession session, Charset charset, FlowFile incomingFlowFile)
    +            throws IOException {
    +        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
    --- End diff --
    
    Corrected, thanks for the pointer.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187655109
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt ---
    @@ -0,0 +1,100 @@
    +1.1,0.5,0.5,0.2,0
    --- End diff --
    
    I've mentioned at the beginning of the tests that these are based on deeplearning4j examples.  I generated this very simple/small sample file to make consistent predictions for the tests even with just a few observations.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187349520
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-charset")
    +            .displayName("Character Set")
    +            .description("Specifies the character set of the document data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-field-separator")
    +            .displayName("Field Separator")
    +            .description("Specifies the field separator in the records. (default is comma)")
    +            .required(true)
    +            .defaultValue(",")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder()
    +            .name("deeplearning4j-record-separator")
    +            .displayName("Record Separator")
    +            .description("Specifies the records separator in the message body. (defaults to new line)")
    +            .required(true)
    +            .defaultValue(System.lineSeparator())
    --- End diff --
    
    I would recommend setting this explicitly to "\n" so that the default behavior is the same across environments.


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @ijokarumawak - 
    
    I've created two flow templates for testing the DL4J processor.  
    
    The first with a file input and file output. The second flow reads a row from rdbms based on id input file containing the single id, classifies the observation and save the classification results in same row.  
    
    The supporting files along with a sample classification model that will work with the two templates are in the repository [nifi-flow-examples](https://github.com/mans2singh/nifi-flow-examples.git) branch nifi-dl4j-flow.  
    
    Here are some details of the two flow templates:
    
    1. The first template is simple one [NifiDL4JFileInputOutput.xml](https://github.com/mans2singh/nifi-flow-examples/blob/nifi-dl4j-flow/dl4jtemplate/NifiDL4JFileInputOutput.xml) that ingests file containing an observation record from directory (sample input in dl4jinput), applies the classification model and writes results to the output directory with the same file name as input.  In this scenario, the correlation is based on the file names.
    
    2. The second [NifiDL4JFileToRdbms.xml](https://github.com/mans2singh/nifi-flow-examples/blob/nifi-dl4j-flow/dl4jtemplate/NifiDL4JFileToRdbms.xml) reads a single id from input file (sample in dl4jinputid directory), queries a rdbms table for the observations for the input id, classifies the observation and updates the db row with classification results.  In this flow template, the row id of the input is used as a correlation id which is used to update the output column of the corresponding row after the classification is done. The flow uses other Nifi processors to ingest, transform, save the classification results.  The table creation and observation row insertion commands are in dl4jsql directory. 
    
    The flow templates require setting the appropriate input/output files, dl4j model, db controller and rdbms table with the records.
    
    Please let me know your thoughts/feedback.
    
    Thanks
    
    Mans


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r194944819
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt ---
    @@ -0,0 +1,100 @@
    +1.1,0.5,0.5,0.2,0
    --- End diff --
    
    @mans2singh Yeah, I understand that. Those test code were helpful to create a model file to use in a NiFi flow. My intention was to make the test data and test case more understandable.
    



---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r187655641
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import java.io.IOException;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +
    +/**
    + * Base class for deeplearning4j processors
    + */
    +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor {
    --- End diff --
    
    @markap14  - This is to establish a foundation for future extensions which will be easier if some common base classes are present.   I found this pattern to be useful and hope it's not an overkill.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r199692494
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml ---
    @@ -0,0 +1,114 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-deeplearning4j-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-deeplearning4j-processors</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-api</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-native-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-8.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.1-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    --- End diff --
    
    Are all these CUDA versions required? I'd like to keep the dependency minimum. According to this [DL4J doc about GPU](https://deeplearning4j.org/gpu), `nd4j-native-platform` would suffice if one want to run DL4J on CPU. I think that should be the default. It would be useful if we let user to change to CUDA by passing version by Maven command option.
    
    By removing CUDA dependencies, NAR size decreased from 3GB to 486MB.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r193626870
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor/additionalDetails.html ---
    @@ -0,0 +1,353 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<head>
    +<meta charset="utf-8" />
    +<title>Deeplearning4JPredictor</title>
    +<link rel="stylesheet" href="../../../../../css/component-usage.css"
    +	type="text/css" />
    +</head>
    +
    +<body>
    +	<!-- Processor Documentation ================================================== -->
    +	<h2>Usage Information</h2>
    +
    +	<p>
    +		DeepLearning4JPredictor processor produces predictions by applying the
    +		provided <a href="http://github.com/deeplearning4j" target="_blank">deeplearning4j</a>
    +		model to the FlowFile contents.
    +	</p>
    +
    +	<p>The processor can perform:
    +	<ul>
    +		<li>classification, or</li>
    +		<li>regression</li>
    +	</ul>
    +	based on the model used.
    +
    +	<p>The processors has the following properties:
    +	<ul>
    +		<li>a deeplearning4j classification or regression model (required)</li>
    +		<li>record dimensions (required)</li>
    +		<li>field separator (default comma)</li>
    +		<li>record separator (default new line character)</li>
    +		<li>character set for FlowFile message body (default UTF8)</li>
    +	</ul>
    --- End diff --
    
    "The processors has the following properties: ..." I'd suggest removing this section as the content is in duplication with the property doc table generated by NiFi based on PropertyDescriptors list.


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @mans2singh As long as we can construct useful flow with these components, I am ok on how to do that.
    
    By using correlation id, we can merge original FlowFile to the prediction result. However I assume we need to write a custom code with ScriptedProcessor to do something from there, if we want to do something within a NiFi flow.
    Would you be able to provide a flow template featuring the processor with input dataset to illustrate how correlation ids helps merging prediction result to the original dataset?


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r199823718
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml ---
    @@ -0,0 +1,114 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-deeplearning4j-bundle</artifactId>
    +        <version>1.7.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-deeplearning4j-processors</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-api</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.nd4j</groupId>
    +            <artifactId>nd4j-native-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +        </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-8.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.0-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    +         <dependency>
    +             <groupId>org.nd4j</groupId>
    +             <artifactId>nd4j-cuda-9.1-platform</artifactId>
    +            <version>1.0.0-alpha</version>
    +         </dependency>
    --- End diff --
    
    @ijokarumawak - That sounds great, I will remove the remove cuda dependencies.


---

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

Posted by jzonthemtn <gi...@git.apache.org>.
Github user jzonthemtn commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2686#discussion_r197102298
  
    --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.deeplearning4j;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    +import org.deeplearning4j.util.ModelSerializer;
    +import org.nd4j.linalg.api.ndarray.INDArray;
    +import org.nd4j.linalg.factory.Nd4j;
    +import com.google.gson.Gson;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"deeplearning4j", "dl4j", "multilayer", "predict", "classification", "regression", "deep", "learning", "neural", "network"})
    +@CapabilityDescription("The DeepLearning4JMultiLayerPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. "
    +    + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. "
    +    + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. "
    +    + "Each record can contain multiple fields with each field separated by the 'Field Separator' property."
    +    )
    +@WritesAttributes({
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"),
    +    @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"),
    +    })
    +public class DeepLearning4JMultiLayerPredictor extends AbstractDeepLearning4JProcessor {
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successful DeepLearning4j results are routed to this relationship").build();
    +
    +    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed DeepLearning4j results are routed to this relationship").build();
    +
    +    protected final Gson gson = new Gson();
    +
    +    protected MultiLayerNetwork model = null;
    +
    +    @OnStopped
    +    public void close() {
    +        getLogger().info("Closing");
    +        model = null;
    +    }
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    static {
    +        final Set<Relationship> tempRelationships = new HashSet<>();
    +        tempRelationships.add(REL_SUCCESS);
    +        tempRelationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(tempRelationships);
    +        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
    +        tempDescriptors.add(MODEL_FILE);
    +        tempDescriptors.add(RECORD_DIMENSIONS);
    +        tempDescriptors.add(CHARSET);
    +        tempDescriptors.add(FIELD_SEPARATOR);
    +        tempDescriptors.add(RECORD_SEPARATOR);
    +        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    protected synchronized MultiLayerNetwork getModel(ProcessContext context) throws IOException {
    +        if ( model == null ) {
    +            String modelFile = context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue();
    +            getLogger().debug("Loading model from {}", new Object[] {modelFile});
    +
    +            long start = System.currentTimeMillis();
    +            model = ModelSerializer.restoreMultiLayerNetwork(modelFile,false);
    +            long end = System.currentTimeMillis();
    +
    +            getLogger().info("Time to load model " + (end-start) +  " ms");
    +        }
    +        return (MultiLayerNetwork)model;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
    +        if ( flowFile.getSize() == 0 ) {
    +            String message = "FlowFile query is empty";
    +            getLogger().error(message);
    +            flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        String input = null;
    +        try {
    +            input = getFlowFileContents(session, charset, flowFile);
    +            String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +            String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
    +
    +            int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator);
    +
    +            if ( getLogger().isDebugEnabled() )    {
    +                getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions });
    +            }
    +
    +            MultiLayerNetwork model = getModel(context);
    --- End diff --
    
    @mans2singh Makes sense. I'm honestly not sure if `@OnScheduled` would be any better. Maybe someone else will comment. Thanks!


---

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/2686
  
    @ijokarumawak - I was thinking that the processor will be used as a transformer (predictor) and there would be a correlation attribute in the flow file which would be used to associate the results with the observations.  This will keep the focus on transformation with simple outputs while still allowing the user the flexibility to use the correlation id to combine/enrich it with other data using a enrichment processor.   I've added test cases which show how to use correlation id.  What's your thought ?


---